Changeset View
Standalone View
pykolab/imap/__init__.py
Show All 37 Lines | class IMAP(object): | ||||
def __init__(self): | def __init__(self): | ||||
# Pool of named IMAP connections, by hostname | # Pool of named IMAP connections, by hostname | ||||
self._imap = {} | self._imap = {} | ||||
# Place holder for the current IMAP connection | # Place holder for the current IMAP connection | ||||
self.imap = None | self.imap = None | ||||
def cleanup_acls(self, aci_subject): | def cleanup_acls(self, aci_subject): | ||||
lm_suffix = "" | |||||
log.info( | log.info( | ||||
_("Cleaning up ACL entries for %s across all folders") % ( | _("Cleaning up ACL entries for %s across all folders") % ( | ||||
aci_subject | aci_subject | ||||
) | ) | ||||
) | ) | ||||
lm_suffix = "" | |||||
if len(aci_subject.split('@')) > 1: | if len(aci_subject.split('@')) > 1: | ||||
lm_suffix = "@%s" % (aci_subject.split('@')[1]) | lm_suffix = "@%s" % (aci_subject.split('@')[1]) | ||||
shared_folders = self.imap.lm( | shared_folders = self.imap.lm( | ||||
"shared/*%s" % (lm_suffix) | "shared/*%s" % (lm_suffix) | ||||
) | ) | ||||
user_folders = self.imap.lm( | user_folders = self.imap.lm( | ||||
"user/*%s" % (lm_suffix) | "user/*%s" % (lm_suffix) | ||||
) | ) | ||||
log.debug( | |||||
_("Cleaning up ACL entries referring to identifier %s") % ( | |||||
aci_subject | |||||
), | |||||
level=5 | |||||
) | |||||
# For all folders (shared and user), ... | # For all folders (shared and user), ... | ||||
folders = user_folders + shared_folders | folders = user_folders + shared_folders | ||||
log.debug(_("Iterating over %d folders") % (len(folders)), level=5) | log.debug(_("Iterating over %d folders") % (len(folders)), level=5) | ||||
# ... loop through them and ... | # ... loop through them and ... | ||||
for folder in folders: | for folder in folders: | ||||
# ... list the ACL entries -- but only if the folder still exists | try { | ||||
vanmeeuwenUnsubmitted Done Inline Actionsvanmeeuwen: ```
try:
``` | |||||
if self.imap.has_folder(folder): | # ... list the ACL entries | ||||
acls = self.imap.lam(folder) | acls = self.imap.lam(folder) | ||||
# For each ACL entry, see if we think it is a current, valid | # For each ACL entry, see if we think it is a current, valid | ||||
# entry | # entry | ||||
for acl_entry in acls.keys(): | for acl_entry in acls.keys(): | ||||
# If the key 'acl_entry' does not exist in the dictionary | # If the key 'acl_entry' does not exist in the dictionary | ||||
# of valid ACL entries, this ACL entry has got to go. | # of valid ACL entries, this ACL entry has got to go. | ||||
if acl_entry == aci_subject: | if acl_entry == aci_subject: | ||||
# Set the ACL to '' (effectively deleting the ACL | # Set the ACL to '' (effectively deleting the ACL | ||||
# entry) | # entry) | ||||
log.debug( | log.debug( | ||||
_( | _( | ||||
"Removing acl %r for subject %r from folder %r" | "Removing acl %r for subject %r from folder %r" | ||||
) % ( | ) % ( | ||||
acls[acl_entry], | acls[acl_entry], | ||||
acl_entry, | acl_entry, | ||||
folder | folder | ||||
), | ), | ||||
level=8 | level=8 | ||||
) | ) | ||||
self.set_acl(folder, acl_entry, '') | self.set_acl(folder, acl_entry, '') | ||||
else: | |||||
log.debug( | except Exception, errmsg: | ||||
Lint: PEP8 E122 continuation line missing indentation or outdented Lint: PEP8 E122: continuation line missing indentation or outdented | |||||
_("Folder %r disappeared (ACL cleanup for %r") % ( | log.error( | ||||
_("Failed to read/set ACL on folder %s: %r") % ( | |||||
folder, | folder, | ||||
aci_subject | errmsg | ||||
), | ) | ||||
level=8 | |||||
) | ) | ||||
def connect(self, uri=None, server=None, domain=None, login=True): | def connect(self, uri=None, server=None, domain=None, login=True): | ||||
Lint: PEP8 E122 continuation line missing indentation or outdented Lint: PEP8 E122: continuation line missing indentation or outdented | |||||
""" | """ | ||||
Connect to the appropriate IMAP backend. | Connect to the appropriate IMAP backend. | ||||
Supply a domain (name space) configured in the configuration file | Supply a domain (name space) configured in the configuration file | ||||
as a section, with a setting 'imap_uri' to connect to a domain | as a section, with a setting 'imap_uri' to connect to a domain | ||||
specific IMAP server, or specify an URI to connect to that | specific IMAP server, or specify an URI to connect to that | ||||
particular IMAP server (in that order). | particular IMAP server (in that order). | ||||
▲ Show 20 Lines • Show All 116 Lines • ▼ Show 20 Lines | def connect(self, uri=None, server=None, domain=None, login=True): | ||||
# IMAP connection to be used. | # IMAP connection to be used. | ||||
self.imap = self._imap[hostname] | self.imap = self._imap[hostname] | ||||
if hasattr(self.imap, 'm') and hasattr(self.imap.m, 'sock'): | if hasattr(self.imap, 'm') and hasattr(self.imap.m, 'sock'): | ||||
self._set_socket_keepalive(self.imap.m.sock) | self._set_socket_keepalive(self.imap.m.sock) | ||||
elif hasattr(self.imap, 'sock'): | elif hasattr(self.imap, 'sock'): | ||||
self._set_socket_keepalive(self.imap.sock) | self._set_socket_keepalive(self.imap.sock) | ||||
def disconnect(self, server=None): | def disconnect(self, server=None): | ||||
Lint: PEP8 E122 continuation line missing indentation or outdented Lint: PEP8 E122: continuation line missing indentation or outdented | |||||
if server == None: | if server == None: | ||||
# No server specified, but make sure self.imap is None anyways | # No server specified, but make sure self.imap is None anyways | ||||
if hasattr(self, 'imap'): | if hasattr(self, 'imap'): | ||||
del self.imap | del self.imap | ||||
# Empty out self._imap as well | # Empty out self._imap as well | ||||
for key in self._imap.keys(): | for key in self._imap.keys(): | ||||
del self._imap[key] | del self._imap[key] | ||||
else: | else: | ||||
if self._imap.has_key(server): | if self._imap.has_key(server): | ||||
del self._imap[server] | del self._imap[server] | ||||
else: | else: | ||||
log.warning(_("Called imap.disconnect() on a server that we had no connection to.")) | log.warning(_("Called imap.disconnect() on a server that we had no connection to.")) | ||||
def create_folder(self, folder_path, server=None, partition=None): | def create_folder(self, folder_path, server=None, partition=None): | ||||
Lint: PEP8 E122 continuation line missing indentation or outdented Lint: PEP8 E122: continuation line missing indentation or outdented | |||||
folder_path = self.folder_utf7(folder_path) | folder_path = self.folder_utf7(folder_path) | ||||
if not server == None: | if not server == None: | ||||
self.connect(server=server) | self.connect(server=server) | ||||
try: | try: | ||||
self._imap[server].cm(folder_path, partition=partition) | self._imap[server].cm(folder_path, partition=partition) | ||||
return True | return True | ||||
except: | except: | ||||
log.error( | log.error( | ||||
_("Could not create folder %r on server %r") % ( | _("Could not create folder %r on server %r") % ( | ||||
folder_path, | folder_path, | ||||
server | server | ||||
) | ) | ||||
) | ) | ||||
else: | else: | ||||
try: | try: | ||||
self.imap.cm(folder_path, partition=partition) | self.imap.cm(folder_path, partition=partition) | ||||
return True | return True | ||||
except: | except: | ||||
log.error(_("Could not create folder %r") % (folder_path)) | log.error(_("Could not create folder %r") % (folder_path)) | ||||
return False | return False | ||||
def __getattr__(self, name): | def __getattr__(self, name): | ||||
Lint: PEP8 E122 continuation line missing indentation or outdented Lint: PEP8 E122: continuation line missing indentation or outdented | |||||
if hasattr(self.imap, name): | if hasattr(self.imap, name): | ||||
return getattr(self.imap, name) | return getattr(self.imap, name) | ||||
elif hasattr(self.imap, 'm'): | elif hasattr(self.imap, 'm'): | ||||
if hasattr(self.imap.m, name): | if hasattr(self.imap.m, name): | ||||
return getattr(self.imap.m, name) | return getattr(self.imap.m, name) | ||||
else: | else: | ||||
raise AttributeError, _("%r has no attribute %s") % (self,name) | raise AttributeError, _("%r has no attribute %s") % (self,name) | ||||
else: | else: | ||||
raise AttributeError, _("%r has no attribute %s") % (self,name) | raise AttributeError, _("%r has no attribute %s") % (self,name) | ||||
def folder_utf7(self, folder): | def folder_utf7(self, folder): | ||||
Lint: PEP8 E122 continuation line missing indentation or outdented Lint: PEP8 E122: continuation line missing indentation or outdented | |||||
from pykolab import imap_utf7 | from pykolab import imap_utf7 | ||||
return imap_utf7.encode(folder) | return imap_utf7.encode(folder) | ||||
def folder_utf8(self, folder): | def folder_utf8(self, folder): | ||||
Lint: PEP8 E122 continuation line missing indentation or outdented Lint: PEP8 E122: continuation line missing indentation or outdented | |||||
from pykolab import imap_utf7 | from pykolab import imap_utf7 | ||||
return imap_utf7.decode(folder) | return imap_utf7.decode(folder) | ||||
def folder_quote(self, folder): | def folder_quote(self, folder): | ||||
Lint: PEP8 E122 continuation line missing indentation or outdented Lint: PEP8 E122: continuation line missing indentation or outdented | |||||
return u'"' + str(folder).strip('"') + '"' | return u'"' + str(folder).strip('"') + '"' | ||||
def get_metadata(self, folder): | def get_metadata(self, folder): | ||||
Lint: PEP8 E122 continuation line missing indentation or outdented Lint: PEP8 E122: continuation line missing indentation or outdented | |||||
""" | """ | ||||
Obtain all metadata entries on a folder | Obtain all metadata entries on a folder | ||||
""" | """ | ||||
metadata = {} | metadata = {} | ||||
_metadata = self.imap.getannotation(self.folder_utf7(folder), '*') | _metadata = self.imap.getannotation(self.folder_utf7(folder), '*') | ||||
for (k,v) in _metadata.items(): | for (k,v) in _metadata.items(): | ||||
metadata[self.folder_utf8(k)] = v | metadata[self.folder_utf8(k)] = v | ||||
return metadata | return metadata | ||||
def get_separator(self): | def get_separator(self): | ||||
Lint: PEP8 E122 continuation line missing indentation or outdented Lint: PEP8 E122: continuation line missing indentation or outdented | |||||
if not hasattr(self, 'imap') or self.imap == None: | if not hasattr(self, 'imap') or self.imap == None: | ||||
self.connect() | self.connect() | ||||
if hasattr(self.imap, 'separator'): | if hasattr(self.imap, 'separator'): | ||||
return self.imap.separator | return self.imap.separator | ||||
elif hasattr(self.imap, 'm') and hasattr(self.imap.m, 'separator'): | elif hasattr(self.imap, 'm') and hasattr(self.imap.m, 'separator'): | ||||
return self.imap.m.separator | return self.imap.m.separator | ||||
else: | else: | ||||
return '/' | return '/' | ||||
def imap_murder(self): | def imap_murder(self): | ||||
Lint: PEP8 E122 continuation line missing indentation or outdented Lint: PEP8 E122: continuation line missing indentation or outdented | |||||
if hasattr(self.imap, 'murder') and self.imap.murder: | if hasattr(self.imap, 'murder') and self.imap.murder: | ||||
return True | return True | ||||
else: | else: | ||||
return False | return False | ||||
def namespaces(self): | def namespaces(self): | ||||
Lint: PEP8 E122 continuation line missing indentation or outdented Lint: PEP8 E122: continuation line missing indentation or outdented | |||||
""" | """ | ||||
Obtain the namespaces. | Obtain the namespaces. | ||||
Returns a tuple of: | Returns a tuple of: | ||||
(str(personal) [, str(other users) [, list(shared)]]) | (str(personal) [, str(other users) [, list(shared)]]) | ||||
""" | """ | ||||
Show All 15 Lines | def namespaces(self): | ||||
if len(_namespaces) >= 2: | if len(_namespaces) >= 2: | ||||
_other_users = ' '.join(_namespaces[1].replace('((','').replace('))','').split()[:-1]).replace('"', '') | _other_users = ' '.join(_namespaces[1].replace('((','').replace('))','').split()[:-1]).replace('"', '') | ||||
if len(_namespaces) >= 1: | if len(_namespaces) >= 1: | ||||
_personal = _namespaces[0].replace('((','').replace('))','').split()[0].replace('"', '') | _personal = _namespaces[0].replace('((','').replace('))','').split()[0].replace('"', '') | ||||
return (_personal, _other_users, _shared) | return (_personal, _other_users, _shared) | ||||
def set_acl(self, folder, identifier, acl): | def set_acl(self, folder, identifier, acl): | ||||
Lint: PEP8 E122 continuation line missing indentation or outdented Lint: PEP8 E122: continuation line missing indentation or outdented | |||||
""" | """ | ||||
Set an ACL entry on a folder. | Set an ACL entry on a folder. | ||||
""" | """ | ||||
_acl = '' | _acl = '' | ||||
short_rights = { | short_rights = { | ||||
'all': 'lrsedntxakcpiw', | 'all': 'lrsedntxakcpiw', | ||||
'append': 'wip', | 'append': 'wip', | ||||
▲ Show 20 Lines • Show All 53 Lines • ▼ Show 20 Lines | def set_acl(self, folder, identifier, acl): | ||||
log.error( | log.error( | ||||
_("Could not set ACL for %s on folder %s: %r") % ( | _("Could not set ACL for %s on folder %s: %r") % ( | ||||
identifier, | identifier, | ||||
folder, | folder, | ||||
errmsg | errmsg | ||||
) | ) | ||||
) | ) | ||||
def set_metadata(self, folder, metadata_path, metadata_value, shared=True): | def set_metadata(self, folder, metadata_path, metadata_value, shared=True): | ||||
Lint: PEP8 E122 continuation line missing indentation or outdented Lint: PEP8 E122: continuation line missing indentation or outdented | |||||
""" | """ | ||||
Set a metadata entry on a folder | Set a metadata entry on a folder | ||||
""" | """ | ||||
if metadata_path.startswith('/shared/'): | if metadata_path.startswith('/shared/'): | ||||
shared = True | shared = True | ||||
metadata_path = metadata_path.replace('/shared/', '/') | metadata_path = metadata_path.replace('/shared/', '/') | ||||
elif metadata_path.startswith('/private/'): | elif metadata_path.startswith('/private/'): | ||||
shared = False | shared = False | ||||
metadata_path = metadata_path.replace('/private/', '/') | metadata_path = metadata_path.replace('/private/', '/') | ||||
self.imap._setannotation(self.folder_utf7(folder), metadata_path, metadata_value, shared) | self.imap._setannotation(self.folder_utf7(folder), metadata_path, metadata_value, shared) | ||||
def shared_folder_create(self, folder_path, server=None): | def shared_folder_create(self, folder_path, server=None): | ||||
Lint: PEP8 E122 continuation line missing indentation or outdented Lint: PEP8 E122: continuation line missing indentation or outdented | |||||
""" | """ | ||||
Create a shared folder. | Create a shared folder. | ||||
""" | """ | ||||
folder_name = "shared%s%s" % (self.get_separator(), folder_path) | folder_name = "shared%s%s" % (self.get_separator(), folder_path) | ||||
# Correct folder_path being supplied with "shared/shared/" for example | # Correct folder_path being supplied with "shared/shared/" for example | ||||
if folder_name.startswith("shared%s" % (self.get_separator()) * 2): | if folder_name.startswith("shared%s" % (self.get_separator()) * 2): | ||||
folder_name = folder_name[7:] | folder_name = folder_name[7:] | ||||
log.info(_("Creating new shared folder %s") %(folder_name)) | log.info(_("Creating new shared folder %s") %(folder_name)) | ||||
self.create_folder(folder_name, server) | self.create_folder(folder_name, server) | ||||
def shared_folder_exists(self, folder_path): | def shared_folder_exists(self, folder_path): | ||||
Lint: PEP8 E122 continuation line missing indentation or outdented Lint: PEP8 E122: continuation line missing indentation or outdented | |||||
""" | """ | ||||
Check if a shared mailbox exists. | Check if a shared mailbox exists. | ||||
""" | """ | ||||
folder_name = 'shared%s%s' % (self.get_separator(), folder_path) | folder_name = 'shared%s%s' % (self.get_separator(), folder_path) | ||||
# Correct folder_path being supplied with "shared/shared/" for example | # Correct folder_path being supplied with "shared/shared/" for example | ||||
if folder_name.startswith("shared%s" % (self.get_separator()) * 2): | if folder_name.startswith("shared%s" % (self.get_separator()) * 2): | ||||
folder_name = folder_name[7:] | folder_name = folder_name[7:] | ||||
return self.has_folder(folder_name) | return self.has_folder(folder_name) | ||||
def shared_folder_set_type(self, folder_path, folder_type): | def shared_folder_set_type(self, folder_path, folder_type): | ||||
Lint: PEP8 E122 continuation line missing indentation or outdented Lint: PEP8 E122: continuation line missing indentation or outdented | |||||
folder_name = 'shared%s%s' % (self.get_separator(), folder_path) | folder_name = 'shared%s%s' % (self.get_separator(), folder_path) | ||||
# Correct folder_path being supplied with "shared/shared/" for example | # Correct folder_path being supplied with "shared/shared/" for example | ||||
if folder_name.startswith("shared%s" % (self.get_separator()) * 2): | if folder_name.startswith("shared%s" % (self.get_separator()) * 2): | ||||
folder_name = folder_name[7:] | folder_name = folder_name[7:] | ||||
self.set_metadata(folder_name, '/shared/vendor/kolab/folder-type', folder_type) | self.set_metadata(folder_name, '/shared/vendor/kolab/folder-type', folder_type) | ||||
def shared_mailbox_create(self, mailbox_base_name, server=None): | def shared_mailbox_create(self, mailbox_base_name, server=None): | ||||
Lint: PEP8 E122 continuation line missing indentation or outdented Lint: PEP8 E122: continuation line missing indentation or outdented | |||||
""" | """ | ||||
Create a shared folder. | Create a shared folder. | ||||
""" | """ | ||||
self.connect() | self.connect() | ||||
folder_name = "shared%s%s" % (self.get_separator(), mailbox_base_name) | folder_name = "shared%s%s" % (self.get_separator(), mailbox_base_name) | ||||
# Correct folder_path being supplied with "shared/shared/" for example | # Correct folder_path being supplied with "shared/shared/" for example | ||||
if folder_name.startswith("shared%s" % (self.get_separator()) * 2): | if folder_name.startswith("shared%s" % (self.get_separator()) * 2): | ||||
folder_name = folder_name[7:] | folder_name = folder_name[7:] | ||||
log.info(_("Creating new shared folder %s") %(mailbox_base_name)) | log.info(_("Creating new shared folder %s") %(mailbox_base_name)) | ||||
self.create_folder(folder_name, server) | self.create_folder(folder_name, server) | ||||
def shared_mailbox_exists(self, mailbox_base_name): | def shared_mailbox_exists(self, mailbox_base_name): | ||||
Lint: PEP8 E122 continuation line missing indentation or outdented Lint: PEP8 E122: continuation line missing indentation or outdented | |||||
""" | """ | ||||
Check if a shared mailbox exists. | Check if a shared mailbox exists. | ||||
""" | """ | ||||
self.connect() | self.connect() | ||||
folder_name = "shared%s%s" % (self.get_separator(), mailbox_base_name) | folder_name = "shared%s%s" % (self.get_separator(), mailbox_base_name) | ||||
# Correct folder_path being supplied with "shared/shared/" for example | # Correct folder_path being supplied with "shared/shared/" for example | ||||
if folder_name.startswith("shared%s" % (self.get_separator()) * 2): | if folder_name.startswith("shared%s" % (self.get_separator()) * 2): | ||||
folder_name = folder_name[7:] | folder_name = folder_name[7:] | ||||
return self.has_folder(folder_name) | return self.has_folder(folder_name) | ||||
def user_mailbox_create(self, mailbox_base_name, server=None): | def user_mailbox_create(self, mailbox_base_name, server=None): | ||||
Lint: PEP8 E122 continuation line missing indentation or outdented Lint: PEP8 E122: continuation line missing indentation or outdented | |||||
""" | """ | ||||
Create a user mailbox. | Create a user mailbox. | ||||
Returns the full path to the new mailbox folder. | Returns the full path to the new mailbox folder. | ||||
""" | """ | ||||
# TODO: Whether or not to lowercase the mailbox name is really up to the | # TODO: Whether or not to lowercase the mailbox name is really up to the | ||||
# IMAP server setting username_tolower (normalize_uid, lmtp_downcase_rcpt). | # IMAP server setting username_tolower (normalize_uid, lmtp_downcase_rcpt). | ||||
▲ Show 20 Lines • Show All 75 Lines • ▼ Show 20 Lines | def user_mailbox_create(self, mailbox_base_name, server=None): | ||||
'sieve_mgmt_refresh', | 'sieve_mgmt_refresh', | ||||
kw={ | kw={ | ||||
'user': mailbox_base_name | 'user': mailbox_base_name | ||||
} | } | ||||
) | ) | ||||
return folder_name | return folder_name | ||||
def user_mailbox_create_additional_folders(self, user, additional_folders): | def user_mailbox_create_additional_folders(self, user, additional_folders): | ||||
Lint: PEP8 E122 continuation line missing indentation or outdented Lint: PEP8 E122: continuation line missing indentation or outdented | |||||
log.debug( | log.debug( | ||||
_("Creating additional folders for user %s") % (user), | _("Creating additional folders for user %s") % (user), | ||||
level=8 | level=8 | ||||
) | ) | ||||
backend = conf.get('kolab', 'imap_backend') | backend = conf.get('kolab', 'imap_backend') | ||||
admin_login = conf.get(backend, 'admin_login') | admin_login = conf.get(backend, 'admin_login') | ||||
▲ Show 20 Lines • Show All 141 Lines • ▼ Show 20 Lines | def user_mailbox_create_additional_folders(self, user, additional_folders): | ||||
if additional_folders[additional_folder].has_key("partition"): | if additional_folders[additional_folder].has_key("partition"): | ||||
partition = additional_folders[additional_folder]["partition"] | partition = additional_folders[additional_folder]["partition"] | ||||
try: | try: | ||||
self.imap._rename(folder_name, folder_name, partition) | self.imap._rename(folder_name, folder_name, partition) | ||||
except: | except: | ||||
log.error(_("Could not rename %s to reside on partition %s") % (folder_name, partition)) | log.error(_("Could not rename %s to reside on partition %s") % (folder_name, partition)) | ||||
def _create_folder_waiting(self, folder_name, server=None): | def _create_folder_waiting(self, folder_name, server=None): | ||||
Lint: PEP8 E122 continuation line missing indentation or outdented Lint: PEP8 E122: continuation line missing indentation or outdented | |||||
""" | """ | ||||
Create a folder and wait to make sure it exists | Create a folder and wait to make sure it exists | ||||
""" | """ | ||||
created = False | created = False | ||||
try: | try: | ||||
max_tries = 10 | max_tries = 10 | ||||
Show All 27 Lines | def _create_folder_waiting(self, folder_name, server=None): | ||||
time.sleep(0.5) | time.sleep(0.5) | ||||
except: | except: | ||||
if conf.debuglevel > 8: | if conf.debuglevel > 8: | ||||
import traceback | import traceback | ||||
traceback.print_exc() | traceback.print_exc() | ||||
return created | return created | ||||
def user_mailbox_delete(self, mailbox_base_name): | def user_mailbox_delete(self, mailbox_base_name): | ||||
Lint: PEP8 E122 continuation line missing indentation or outdented Lint: PEP8 E122: continuation line missing indentation or outdented | |||||
""" | """ | ||||
Delete a user mailbox. | Delete a user mailbox. | ||||
""" | """ | ||||
self.connect() | self.connect() | ||||
folder = "user%s%s" %(self.get_separator(),mailbox_base_name) | folder = "user%s%s" %(self.get_separator(),mailbox_base_name) | ||||
self.delete_mailfolder(folder) | self.delete_mailfolder(folder) | ||||
self.cleanup_acls(mailbox_base_name) | self.cleanup_acls(mailbox_base_name) | ||||
def user_mailbox_exists(self, mailbox_base_name): | def user_mailbox_exists(self, mailbox_base_name): | ||||
Lint: PEP8 E122 continuation line missing indentation or outdented Lint: PEP8 E122: continuation line missing indentation or outdented | |||||
""" | """ | ||||
Check if a user mailbox exists. | Check if a user mailbox exists. | ||||
""" | """ | ||||
self.connect() | self.connect() | ||||
if not mailbox_base_name == mailbox_base_name.lower(): | if not mailbox_base_name == mailbox_base_name.lower(): | ||||
log.warning(_("Downcasing mailbox name %r") % (mailbox_base_name)) | log.warning(_("Downcasing mailbox name %r") % (mailbox_base_name)) | ||||
mailbox_base_name = mailbox_base_name.lower() | mailbox_base_name = mailbox_base_name.lower() | ||||
return self.has_folder('user%s%s' %(self.get_separator(), mailbox_base_name)) | return self.has_folder('user%s%s' %(self.get_separator(), mailbox_base_name)) | ||||
def user_mailbox_quota(self, mailbox_quota): | def user_mailbox_quota(self, mailbox_quota): | ||||
Lint: PEP8 E122 continuation line missing indentation or outdented Lint: PEP8 E122: continuation line missing indentation or outdented | |||||
pass | pass | ||||
def user_mailbox_rename(self, old_name, new_name, partition=None): | def user_mailbox_rename(self, old_name, new_name, partition=None): | ||||
Lint: PEP8 E122 continuation line missing indentation or outdented Lint: PEP8 E122: continuation line missing indentation or outdented | |||||
self.connect() | self.connect() | ||||
old_name = "user%s%s" % (self.get_separator(),old_name) | old_name = "user%s%s" % (self.get_separator(),old_name) | ||||
new_name = "user%s%s" % (self.get_separator(),new_name) | new_name = "user%s%s" % (self.get_separator(),new_name) | ||||
if old_name == new_name and partition == None: | if old_name == new_name and partition == None: | ||||
return | return | ||||
if not self.has_folder(old_name): | if not self.has_folder(old_name): | ||||
log.error(_("INBOX folder to rename (%s) does not exist") % (old_name)) | log.error(_("INBOX folder to rename (%s) does not exist") % (old_name)) | ||||
if not self.has_folder(new_name) or not partition == None: | if not self.has_folder(new_name) or not partition == None: | ||||
log.info(_("Renaming INBOX from %s to %s") % (old_name,new_name)) | log.info(_("Renaming INBOX from %s to %s") % (old_name,new_name)) | ||||
try: | try: | ||||
self.imap.rename(old_name,new_name,partition) | self.imap.rename(old_name,new_name,partition) | ||||
except: | except: | ||||
log.error(_("Could not rename INBOX folder %s to %s") % (old_name,new_name)) | log.error(_("Could not rename INBOX folder %s to %s") % (old_name,new_name)) | ||||
else: | else: | ||||
log.warning(_("Moving INBOX folder %s won't succeed as target folder %s already exists") % (old_name,new_name)) | log.warning(_("Moving INBOX folder %s won't succeed as target folder %s already exists") % (old_name,new_name)) | ||||
def user_mailbox_server(self, mailbox): | def user_mailbox_server(self, mailbox): | ||||
Lint: PEP8 E122 continuation line missing indentation or outdented Lint: PEP8 E122: continuation line missing indentation or outdented | |||||
server = self.imap.find_mailfolder_server(mailbox.lower()).lower() | server = self.imap.find_mailfolder_server(mailbox.lower()).lower() | ||||
log.debug(_("Server for mailbox %r is %r") % (mailbox, server), level=8) | log.debug(_("Server for mailbox %r is %r") % (mailbox, server), level=8) | ||||
return server | return server | ||||
def has_folder(self, folder): | def has_folder(self, folder): | ||||
Lint: PEP8 E122 continuation line missing indentation or outdented Lint: PEP8 E122: continuation line missing indentation or outdented | |||||
""" | """ | ||||
Check if the environment has a folder named folder. | Check if the environment has a folder named folder. | ||||
""" | """ | ||||
folders = self.imap.lm(self.folder_utf7(folder)) | folders = self.imap.lm(self.folder_utf7(folder)) | ||||
log.debug(_("Looking for folder '%s', we found folders: %r") % (folder,[self.folder_utf8(x) for x in folders]), level=8) | log.debug(_("Looking for folder '%s', we found folders: %r") % (folder,[self.folder_utf8(x) for x in folders]), level=8) | ||||
# Greater then one, this folder may have subfolders. | # Greater then one, this folder may have subfolders. | ||||
if len(folders) > 0: | if len(folders) > 0: | ||||
return True | return True | ||||
else: | else: | ||||
return False | return False | ||||
def _set_socket_keepalive(self, sock): | def _set_socket_keepalive(self, sock): | ||||
Lint: PEP8 E122 continuation line missing indentation or outdented Lint: PEP8 E122: continuation line missing indentation or outdented | |||||
sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) | sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) | ||||
with open('/proc/sys/net/ipv4/tcp_keepalive_time', 'r') as f: | with open('/proc/sys/net/ipv4/tcp_keepalive_time', 'r') as f: | ||||
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, (int)(f.read())) | sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, (int)(f.read())) | ||||
with open('/proc/sys/net/ipv4/tcp_keepalive_intvl', 'r') as f: | with open('/proc/sys/net/ipv4/tcp_keepalive_intvl', 'r') as f: | ||||
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, (int)(f.read())) | sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, (int)(f.read())) | ||||
with open('/proc/sys/net/ipv4/tcp_keepalive_probes', 'r') as f: | with open('/proc/sys/net/ipv4/tcp_keepalive_probes', 'r') as f: | ||||
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, (int)(f.read())) | sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, (int)(f.read())) | ||||
def _set_kolab_mailfolder_acls(self, acls, folder=None, update=False): | def _set_kolab_mailfolder_acls(self, acls, folder=None, update=False): | ||||
Lint: PEP8 E122 continuation line missing indentation or outdented Lint: PEP8 E122: continuation line missing indentation or outdented | |||||
# special case, folder has no ACLs assigned and update was requested, | # special case, folder has no ACLs assigned and update was requested, | ||||
# remove all existing ACL entries | # remove all existing ACL entries | ||||
if update is True and isinstance(acls, list) and len(acls) == 0: | if update is True and isinstance(acls, list) and len(acls) == 0: | ||||
acls = self.list_acls(folder) | acls = self.list_acls(folder) | ||||
for subject in acls: | for subject in acls: | ||||
log.debug( | log.debug( | ||||
_("Removing ACL rights %s for subject %s on folder " + \ | _("Removing ACL rights %s for subject %s on folder " + \ | ||||
"%s") % (acls[subject], subject, folder), level=8) | "%s") % (acls[subject], subject, folder), level=8) | ||||
▲ Show 20 Lines • Show All 53 Lines • ▼ Show 20 Lines | def _set_kolab_mailfolder_acls(self, acls, folder=None, update=False): | ||||
if old_acls[subject] is not None: | if old_acls[subject] is not None: | ||||
log.debug( | log.debug( | ||||
_("Removing ACL rights %s for subject %s on folder " + \ | _("Removing ACL rights %s for subject %s on folder " + \ | ||||
"%s") % (old_acls[subject], subject, folder), level=8) | "%s") % (old_acls[subject], subject, folder), level=8) | ||||
self.set_acl(folder, subject, '') | self.set_acl(folder, subject, '') | ||||
pass | pass | ||||
""" Blah functions """ | """ Blah functions """ | ||||
Lint: PEP8 E122 continuation line missing indentation or outdented Lint: PEP8 E122: continuation line missing indentation or outdented | |||||
def move_user_folders(self, users=[], domain=None): | def move_user_folders(self, users=[], domain=None): | ||||
Lint: PEP8 E122 continuation line missing indentation or outdented Lint: PEP8 E122: continuation line missing indentation or outdented | |||||
for user in users: | for user in users: | ||||
if type(user) == dict: | if type(user) == dict: | ||||
if user.has_key('old_mail'): | if user.has_key('old_mail'): | ||||
inbox = "user/%s" % (user['mail']) | inbox = "user/%s" % (user['mail']) | ||||
old_inbox = "user/%s" % (user['old_mail']) | old_inbox = "user/%s" % (user['old_mail']) | ||||
if self.has_folder(old_inbox): | if self.has_folder(old_inbox): | ||||
log.debug(_("Found old INBOX folder %s") % (old_inbox), level=8) | log.debug(_("Found old INBOX folder %s") % (old_inbox), level=8) | ||||
if not self.has_folder(inbox): | if not self.has_folder(inbox): | ||||
log.info(_("Renaming INBOX from %s to %s") % (old_inbox,inbox)) | log.info(_("Renaming INBOX from %s to %s") % (old_inbox,inbox)) | ||||
self.imap.rename(old_inbox,inbox) | self.imap.rename(old_inbox,inbox) | ||||
self.inbox_folders.append(inbox) | self.inbox_folders.append(inbox) | ||||
else: | else: | ||||
log.warning(_("Moving INBOX folder %s won't succeed as target folder %s already exists") % (old_inbox,inbox)) | log.warning(_("Moving INBOX folder %s won't succeed as target folder %s already exists") % (old_inbox,inbox)) | ||||
else: | else: | ||||
log.debug(_("Did not find old folder user/%s to rename") % (user['old_mail']), level=8) | log.debug(_("Did not find old folder user/%s to rename") % (user['old_mail']), level=8) | ||||
else: | else: | ||||
log.debug(_("Value for user is not a dictionary"), level=8) | log.debug(_("Value for user is not a dictionary"), level=8) | ||||
def set_quota(self, folder, quota): | def set_quota(self, folder, quota): | ||||
Lint: PEP8 E122 continuation line missing indentation or outdented Lint: PEP8 E122: continuation line missing indentation or outdented | |||||
i = 0 | i = 0 | ||||
while i < 10: | while i < 10: | ||||
try: | try: | ||||
self.imap._setquota(folder, quota) | self.imap._setquota(folder, quota) | ||||
i = 10 | i = 10 | ||||
except: | except: | ||||
self.disconnect() | self.disconnect() | ||||
self.connect() | self.connect() | ||||
i += 1 | i += 1 | ||||
def set_user_folder_quota(self, users=[], primary_domain=None, secondary_domain=[], folders=[]): | def set_user_folder_quota(self, users=[], primary_domain=None, secondary_domain=[], folders=[]): | ||||
Lint: PEP8 E122 continuation line missing indentation or outdented Lint: PEP8 E122: continuation line missing indentation or outdented | |||||
""" | """ | ||||
Sets the quota in IMAP using the authentication and authorization | Sets the quota in IMAP using the authentication and authorization | ||||
database 'quota' attribute for the users listed in parameter 'users' | database 'quota' attribute for the users listed in parameter 'users' | ||||
""" | """ | ||||
if conf.has_option(primary_domain, 'quota_attribute'): | if conf.has_option(primary_domain, 'quota_attribute'): | ||||
_quota_attr = conf.get(primary_domain, 'quota_attribute') | _quota_attr = conf.get(primary_domain, 'quota_attribute') | ||||
else: | else: | ||||
▲ Show 20 Lines • Show All 65 Lines • ▼ Show 20 Lines | def set_user_folder_quota(self, users=[], primary_domain=None, secondary_domain=[], folders=[]): | ||||
log.info(_("Adjusting authentication database quota for folder %s to %d") % (folder,int(new_quota))) | log.info(_("Adjusting authentication database quota for folder %s to %d") % (folder,int(new_quota))) | ||||
quota = int(new_quota) | quota = int(new_quota) | ||||
auth.set_user_attribute(primary_domain, user, _quota_attr, new_quota) | auth.set_user_attribute(primary_domain, user, _quota_attr, new_quota) | ||||
if not int(current_quota) == int(quota): | if not int(current_quota) == int(quota): | ||||
log.info(_("Correcting quota for %s to %s (currently %s)") % (folder, quota, current_quota)) | log.info(_("Correcting quota for %s to %s (currently %s)") % (folder, quota, current_quota)) | ||||
self.imap._setquota(folder, quota) | self.imap._setquota(folder, quota) | ||||
def set_user_mailhost(self, users=[], primary_domain=None, secondary_domain=[], folders=[]): | def set_user_mailhost(self, users=[], primary_domain=None, secondary_domain=[], folders=[]): | ||||
Lint: PEP8 E122 continuation line missing indentation or outdented Lint: PEP8 E122: continuation line missing indentation or outdented | |||||
if conf.has_option(primary_domain, 'mailserver_attribute'): | if conf.has_option(primary_domain, 'mailserver_attribute'): | ||||
_mailserver_attr = conf.get(primary_domain, 'mailserver_attribute') | _mailserver_attr = conf.get(primary_domain, 'mailserver_attribute') | ||||
else: | else: | ||||
auth_mechanism = conf.get('kolab', 'auth_mechanism') | auth_mechanism = conf.get('kolab', 'auth_mechanism') | ||||
_mailserver_attr = conf.get(auth_mechanism, 'mailserver_attribute') | _mailserver_attr = conf.get(auth_mechanism, 'mailserver_attribute') | ||||
_inbox_folder_attr = conf.get('cyrus-sasl', 'result_attribute') | _inbox_folder_attr = conf.get('cyrus-sasl', 'result_attribute') | ||||
Show All 30 Lines | def set_user_mailhost(self, users=[], primary_domain=None, secondary_domain=[], folders=[]): | ||||
if not _mailserver == None: | if not _mailserver == None: | ||||
# TODO: | # TODO: | ||||
if not _current_mailserver == _mailserver: | if not _current_mailserver == _mailserver: | ||||
self.imap._xfer(folder, _current_mailserver, _mailserver) | self.imap._xfer(folder, _current_mailserver, _mailserver) | ||||
else: | else: | ||||
auth.set_user_attribute(primary_domain, user, _mailserver_attr, _current_mailserver) | auth.set_user_attribute(primary_domain, user, _mailserver_attr, _current_mailserver) | ||||
def parse_mailfolder(self, mailfolder): | def parse_mailfolder(self, mailfolder): | ||||
Lint: PEP8 E122 continuation line missing indentation or outdented Lint: PEP8 E122: continuation line missing indentation or outdented | |||||
return self.imap.parse_mailfolder(mailfolder) | return self.imap.parse_mailfolder(mailfolder) | ||||
def expunge_user_folders(self, inbox_folders=None): | def expunge_user_folders(self, inbox_folders=None): | ||||
Lint: PEP8 E122 continuation line missing indentation or outdented Lint: PEP8 E122: continuation line missing indentation or outdented | |||||
""" | """ | ||||
Delete folders that have no equivalent user qualifier in the list | Delete folders that have no equivalent user qualifier in the list | ||||
of users passed to this function, ... | of users passed to this function, ... | ||||
TODO: Explain the domain parameter, and actually get it to work | TODO: Explain the domain parameter, and actually get it to work | ||||
properly. This also relates to threading for multi-domain | properly. This also relates to threading for multi-domain | ||||
deployments. | deployments. | ||||
Show All 21 Lines | def expunge_user_folders(self, inbox_folders=None): | ||||
self.delete_mailfolder("user/%s" % (folder)) | self.delete_mailfolder("user/%s" % (folder)) | ||||
except: | except: | ||||
log.info(_("Folder has no corresponding user (2): %s") % (folder)) | log.info(_("Folder has no corresponding user (2): %s") % (folder)) | ||||
try: | try: | ||||
self.delete_mailfolder("user/%s" % (folder)) | self.delete_mailfolder("user/%s" % (folder)) | ||||
except: | except: | ||||
pass | pass | ||||
def delete_mailfolder(self, mailfolder_path): | def delete_mailfolder(self, mailfolder_path): | ||||
Lint: PEP8 E122 continuation line missing indentation or outdented Lint: PEP8 E122: continuation line missing indentation or outdented | |||||
""" | """ | ||||
Deletes a mail folder described by mailfolder_path. | Deletes a mail folder described by mailfolder_path. | ||||
""" | """ | ||||
mbox_parts = self.parse_mailfolder(mailfolder_path) | mbox_parts = self.parse_mailfolder(mailfolder_path) | ||||
if mbox_parts == None: | if mbox_parts == None: | ||||
# We got user identifier only | # We got user identifier only | ||||
log.error(_("Please don't give us just a user identifier")) | log.error(_("Please don't give us just a user identifier")) | ||||
return | return | ||||
log.info(_("Deleting folder %s") % (mailfolder_path)) | log.info(_("Deleting folder %s") % (mailfolder_path)) | ||||
self.imap.dm(self.folder_utf7(mailfolder_path)) | self.imap.dm(self.folder_utf7(mailfolder_path)) | ||||
def get_quota(self, mailfolder_path): | def get_quota(self, mailfolder_path): | ||||
Lint: PEP8 E122 continuation line missing indentation or outdented Lint: PEP8 E122: continuation line missing indentation or outdented | |||||
try: | try: | ||||
return self.lq(self.folder_utf7(mailfolder_path)) | return self.lq(self.folder_utf7(mailfolder_path)) | ||||
except: | except: | ||||
return | return | ||||
def get_quota_root(self, mailfolder_path): | def get_quota_root(self, mailfolder_path): | ||||
Lint: PEP8 E122 continuation line missing indentation or outdented Lint: PEP8 E122: continuation line missing indentation or outdented | |||||
return self.lqr(self.folder_utf7(mailfolder_path)) | return self.lqr(self.folder_utf7(mailfolder_path)) | ||||
def list_acls(self, folder): | def list_acls(self, folder): | ||||
Lint: PEP8 E122 continuation line missing indentation or outdented Lint: PEP8 E122: continuation line missing indentation or outdented | |||||
""" | """ | ||||
List the ACL entries on a folder | List the ACL entries on a folder | ||||
""" | """ | ||||
return self.imap.lam(self.folder_utf7(folder)) | return self.imap.lam(self.folder_utf7(folder)) | ||||
def list_folders(self, pattern): | def list_folders(self, pattern): | ||||
Lint: PEP8 E122 continuation line missing indentation or outdented Lint: PEP8 E122: continuation line missing indentation or outdented | |||||
return [self.folder_utf8(x) for x in self.lm(self.folder_utf7(pattern))] | return [self.folder_utf8(x) for x in self.lm(self.folder_utf7(pattern))] | ||||
def list_user_folders(self, primary_domain=None, secondary_domains=[]): | def list_user_folders(self, primary_domain=None, secondary_domains=[]): | ||||
Lint: PEP8 E122 continuation line missing indentation or outdented Lint: PEP8 E122: continuation line missing indentation or outdented | |||||
""" | """ | ||||
List the INBOX folders in the IMAP backend. Returns a list of unique | List the INBOX folders in the IMAP backend. Returns a list of unique | ||||
base folder names. | base folder names. | ||||
""" | """ | ||||
_folders = self.imap.lm("user/%") | _folders = self.imap.lm("user/%") | ||||
# TODO: Replace the .* below with a regex representing acceptable DNS | # TODO: Replace the .* below with a regex representing acceptable DNS | ||||
# domain names. | # domain names. | ||||
domain_re = ".*\.?%s$" | domain_re = ".*\.?%s$" | ||||
Show All 27 Lines | def list_user_folders(self, primary_domain=None, secondary_domains=[]): | ||||
folder_name = "%s" % (folder.split(self.get_separator())[1]) | folder_name = "%s" % (folder.split(self.get_separator())[1]) | ||||
if not folder_name == None: | if not folder_name == None: | ||||
if not folder_name in folders: | if not folder_name in folders: | ||||
folders.append(folder_name) | folders.append(folder_name) | ||||
return folders | return folders | ||||
def lm(self, *args, **kw): | def lm(self, *args, **kw): | ||||
Lint: PEP8 E122 continuation line missing indentation or outdented Lint: PEP8 E122: continuation line missing indentation or outdented | |||||
return self.imap.lm(*args, **kw) | return self.imap.lm(*args, **kw) | ||||
def lq(self, *args, **kw): | def lq(self, *args, **kw): | ||||
Lint: PEP8 E122 continuation line missing indentation or outdented Lint: PEP8 E122: continuation line missing indentation or outdented | |||||
return self.imap.lq(*args, **kw) | return self.imap.lq(*args, **kw) | ||||
def lqr(self, *args, **kw): | def lqr(self, *args, **kw): | ||||
Lint: PEP8 E122 continuation line missing indentation or outdented Lint: PEP8 E122: continuation line missing indentation or outdented | |||||
try: | try: | ||||
return self.imap.lqr(*args, **kw) | return self.imap.lqr(*args, **kw) | ||||
except: | except: | ||||
return (None, None, None) | return (None, None, None) | ||||
def undelete_mailfolder(self, *args, **kw): | def undelete_mailfolder(self, *args, **kw): | ||||
Lint: PEP8 E122 continuation line missing indentation or outdented Lint: PEP8 E122: continuation line missing indentation or outdented | |||||
self.imap.undelete_mailfolder(*args, **kw) | self.imap.undelete_mailfolder(*args, **kw) | ||||
Lint: PEP8 E901 TokenError: EOF in multi-line statement Lint: PEP8 E901: TokenError: EOF in multi-line statement |