Changeset View
Changeset View
Standalone View
Standalone View
pykolab/imap/__init__.py
Show First 20 Lines • Show All 64 Lines • ▼ Show 20 Lines | def cleanup_acls(self, aci_subject): | ||||
# ... loop through them and ... | # ... loop through them and ... | ||||
for folder in folders: | for folder in folders: | ||||
try: | try: | ||||
# ... list the ACL entries | # ... list the ACL entries | ||||
acls = self.imap.lam(folder) | acls = self.imap.lam(folder) | ||||
# For each ACL entry, see if we think it is a current, valid | # For each ACL entry, see if we think it is a current, valid | ||||
# entry | # entry | ||||
for acl_entry in acls.keys(): | for acl_entry in acls: | ||||
# If the key 'acl_entry' does not exist in the dictionary | # If the key 'acl_entry' does not exist in the dictionary | ||||
# of valid ACL entries, this ACL entry has got to go. | # of valid ACL entries, this ACL entry has got to go. | ||||
if acl_entry == aci_subject: | if acl_entry == aci_subject: | ||||
# Set the ACL to '' (effectively deleting the ACL | # Set the ACL to '' (effectively deleting the ACL | ||||
# entry) | # entry) | ||||
log.debug( | log.debug( | ||||
_( | _( | ||||
"Removing acl %r for subject %r from folder %r" | "Removing acl %r for subject %r from folder %r" | ||||
) % ( | ) % ( | ||||
acls[acl_entry], | acls[acl_entry], | ||||
acl_entry, | acl_entry, | ||||
folder | folder | ||||
), | ), | ||||
level=8 | level=8 | ||||
) | ) | ||||
self.set_acl(folder, acl_entry, '') | self.set_acl(folder, acl_entry, '') | ||||
except Exception, errmsg: | except Exception as errmsg: | ||||
log.error( | log.error( | ||||
_("Failed to read/set ACL on folder %s: %r") % ( | _("Failed to read/set ACL on folder %s: %r") % ( | ||||
folder, | folder, | ||||
errmsg | errmsg | ||||
) | ) | ||||
) | ) | ||||
def connect(self, uri=None, server=None, domain=None, login=True): | def connect(self, uri=None, server=None, domain=None, login=True): | ||||
▲ Show 20 Lines • Show All 137 Lines • ▼ Show 20 Lines | class IMAP(object): | ||||
def disconnect(self, server=None): | def disconnect(self, server=None): | ||||
if server is None: | if server is None: | ||||
# No server specified, but make sure self.imap is None anyways | # No server specified, but make sure self.imap is None anyways | ||||
if hasattr(self, 'imap'): | if hasattr(self, 'imap'): | ||||
del self.imap | del self.imap | ||||
# Empty out self._imap as well | # Empty out self._imap as well | ||||
for key in self._imap.keys(): | for key in self._imap: | ||||
del self._imap[key] | del self._imap[key] | ||||
else: | else: | ||||
if server in self._imap: | if server in self._imap: | ||||
del self._imap[server] | del self._imap[server] | ||||
else: | else: | ||||
log.warning( | log.warning( | ||||
_("Called imap.disconnect() on a server that we had no connection to.") | _("Called imap.disconnect() on a server that we had no connection to.") | ||||
▲ Show 20 Lines • Show All 150 Lines • ▼ Show 20 Lines | def set_acl(self, folder, identifier, acl): | ||||
continue | continue | ||||
if char == '+': | if char == '+': | ||||
mode = 'add' | mode = 'add' | ||||
continue | continue | ||||
acl_map[mode] += char | acl_map[mode] += char | ||||
current_acls = self.imap.lam(self.folder_utf7(folder)) | current_acls = self.imap.lam(self.folder_utf7(folder)) | ||||
for current_acl in current_acls.keys(): | for current_acl in current_acls: | ||||
if current_acl == identifier: | if current_acl == identifier: | ||||
_acl = current_acls[current_acl] | _acl = current_acls[current_acl] | ||||
break | break | ||||
_acl = _acl + acl_map['set'] + acl_map['add'] | _acl = _acl + acl_map['set'] + acl_map['add'] | ||||
_acl = [x for x in _acl.split() if x not in acl_map['subtract'].split()] | _acl = [x for x in _acl.split() if x not in acl_map['subtract'].split()] | ||||
acl = ''.join(list(set(_acl))) | acl = ''.join(list(set(_acl))) | ||||
▲ Show 20 Lines • Show All 125 Lines • ▼ Show 20 Lines | def user_mailbox_create(self, mailbox_base_name, server=None): | ||||
_additional_folders = None | _additional_folders = None | ||||
if not hasattr(self, 'domain'): | if not hasattr(self, 'domain'): | ||||
self.domain = None | self.domain = None | ||||
if self.domain is None and len(mailbox_base_name.split('@')) > 1: | if self.domain is None and len(mailbox_base_name.split('@')) > 1: | ||||
self.domain = mailbox_base_name.split('@')[1] | self.domain = mailbox_base_name.split('@')[1] | ||||
if not self.domain is None: | if not self.domain is None: | ||||
Lint: PEP8 E714: test for object identity should be 'is not' | |||||
if conf.has_option(self.domain, "autocreate_folders"): | if conf.has_option(self.domain, "autocreate_folders"): | ||||
_additional_folders = conf.get_raw( | _additional_folders = conf.get_raw( | ||||
self.domain, | self.domain, | ||||
"autocreate_folders" | "autocreate_folders" | ||||
) | ) | ||||
else: | else: | ||||
from pykolab.auth import Auth | from pykolab.auth import Auth | ||||
auth = Auth() | auth = Auth() | ||||
auth.connect() | auth.connect() | ||||
domains = auth.list_domains(self.domain) | domains = auth.list_domains(self.domain) | ||||
auth.disconnect() | auth.disconnect() | ||||
if len(domains.keys()) > 0: | if len(domains) > 0: | ||||
if self.domain in domains: | if self.domain in domains: | ||||
primary = domains[self.domain] | primary = domains[self.domain] | ||||
if conf.has_option(primary, "autocreate_folders"): | if conf.has_option(primary, "autocreate_folders"): | ||||
_additional_folders = conf.get_raw( | _additional_folders = conf.get_raw( | ||||
primary, | primary, | ||||
"autocreate_folders" | "autocreate_folders" | ||||
) | ) | ||||
Show All 14 Lines | def user_mailbox_create(self, mailbox_base_name, server=None): | ||||
) | ) | ||||
if additional_folders is not None: | if additional_folders is not None: | ||||
self.user_mailbox_create_additional_folders( | self.user_mailbox_create_additional_folders( | ||||
mailbox_base_name, | mailbox_base_name, | ||||
additional_folders | additional_folders | ||||
) | ) | ||||
if not self.domain is None: | if not self.domain is None: | ||||
Lint: PEP8 E714 test for object identity should be 'is not' Lint: PEP8 E714: test for object identity should be 'is not' | |||||
if conf.has_option(self.domain, "sieve_mgmt"): | if conf.has_option(self.domain, "sieve_mgmt"): | ||||
sieve_mgmt_enabled = conf.get(self.domain, 'sieve_mgmt') | sieve_mgmt_enabled = conf.get(self.domain, 'sieve_mgmt') | ||||
if utils.true_or_false(sieve_mgmt_enabled): | if utils.true_or_false(sieve_mgmt_enabled): | ||||
conf.plugins.exec_hook( | conf.plugins.exec_hook( | ||||
'sieve_mgmt_refresh', | 'sieve_mgmt_refresh', | ||||
kw={ | kw={ | ||||
'user': mailbox_base_name | 'user': mailbox_base_name | ||||
} | } | ||||
Show All 34 Lines | def user_mailbox_create_additional_folders(self, user, additional_folders): | ||||
last_log = time.time() | last_log = time.time() | ||||
if conf.debuglevel > 8: | if conf.debuglevel > 8: | ||||
import traceback | import traceback | ||||
traceback.print_exc() | traceback.print_exc() | ||||
time.sleep(0.5) | time.sleep(0.5) | ||||
for additional_folder in additional_folders.keys(): | for additional_folder in additional_folders: | ||||
_add_folder = {} | _add_folder = {} | ||||
folder_name = additional_folder | folder_name = additional_folder | ||||
if not folder_name.startswith(personal): | if not folder_name.startswith(personal): | ||||
log.error(_("Correcting additional folder name from %r to %r") % (folder_name, "%s%s" % (personal, folder_name))) | log.error(_("Correcting additional folder name from %r to %r") % (folder_name, "%s%s" % (personal, folder_name))) | ||||
folder_name = "%s%s" % (personal, folder_name) | folder_name = "%s%s" % (personal, folder_name) | ||||
success = self._create_folder_waiting(folder_name) | success = self._create_folder_waiting(folder_name) | ||||
if not success: | if not success: | ||||
log.warning(_("Failed to create folder: %s") % (folder_name)) | log.warning(_("Failed to create folder: %s") % (folder_name)) | ||||
continue | continue | ||||
if "annotations" in additional_folders[additional_folder]: | if "annotations" in additional_folders[additional_folder]: | ||||
for annotation in additional_folders[additional_folder]["annotations"].keys(): | for annotation in additional_folders[additional_folder]["annotations"]: | ||||
self.set_metadata( | self.set_metadata( | ||||
folder_name, | folder_name, | ||||
"%s" % (annotation), | "%s" % (annotation), | ||||
"%s" % (additional_folders[additional_folder]["annotations"][annotation]) | "%s" % (additional_folders[additional_folder]["annotations"][annotation]) | ||||
) | ) | ||||
if "acls" in additional_folders[additional_folder]: | if "acls" in additional_folders[additional_folder]: | ||||
for acl in additional_folders[additional_folder]["acls"].keys(): | for acl in additional_folders[additional_folder]["acls"]: | ||||
self.set_acl( | self.set_acl( | ||||
folder_name, | folder_name, | ||||
"%s" % (acl), | "%s" % (acl), | ||||
"%s" % (additional_folders[additional_folder]["acls"][acl]) | "%s" % (additional_folders[additional_folder]["acls"][acl]) | ||||
) | ) | ||||
if len(user.split('@')) > 1: | if len(user.split('@')) > 1: | ||||
localpart = user.split('@')[0] | localpart = user.split('@')[0] | ||||
▲ Show 20 Lines • Show All 48 Lines • ▼ Show 20 Lines | def user_mailbox_create_additional_folders(self, user, additional_folders): | ||||
try: | try: | ||||
self.subscribe(_folder) | self.subscribe(_folder) | ||||
except Exception as errmsg: | except Exception as errmsg: | ||||
log.error(_("Subscribing %s to folder %s failed: %r") % (user, _folder, errmsg)) | log.error(_("Subscribing %s to folder %s failed: %r") % (user, _folder, errmsg)) | ||||
self.logout() | self.logout() | ||||
self.connect(domain=self.domain) | self.connect(domain=self.domain) | ||||
for additional_folder in additional_folders.keys(): | for additional_folder in additional_folders: | ||||
if additional_folder.startswith(personal) and not personal == '': | if additional_folder.startswith(personal) and not personal == '': | ||||
folder_name = additional_folder.replace(personal, '') | folder_name = additional_folder.replace(personal, '') | ||||
else: | else: | ||||
folder_name = additional_folder | folder_name = additional_folder | ||||
folder_name = "user%s%s%s%s%s" % ( | folder_name = "user%s%s%s%s%s" % ( | ||||
self.get_separator(), | self.get_separator(), | ||||
localpart, | localpart, | ||||
▲ Show 20 Lines • Show All 94 Lines • ▼ Show 20 Lines | def user_mailbox_rename(self, old_name, new_name, partition=None): | ||||
new_name = "user%s%s" % (self.get_separator(), new_name) | new_name = "user%s%s" % (self.get_separator(), new_name) | ||||
if old_name == new_name and partition is None: | if old_name == new_name and partition is None: | ||||
return | return | ||||
if not self.has_folder(old_name): | if not self.has_folder(old_name): | ||||
log.error(_("INBOX folder to rename (%s) does not exist") % (old_name)) | log.error(_("INBOX folder to rename (%s) does not exist") % (old_name)) | ||||
if not self.has_folder(new_name) or not partition is None: | if not self.has_folder(new_name) or not partition is None: | ||||
Lint: PEP8 E714 test for object identity should be 'is not' Lint: PEP8 E714: test for object identity should be 'is not' | |||||
log.info(_("Renaming INBOX from %s to %s") % (old_name, new_name)) | log.info(_("Renaming INBOX from %s to %s") % (old_name, new_name)) | ||||
try: | try: | ||||
self.imap.rename(old_name, new_name, partition) | self.imap.rename(old_name, new_name, partition) | ||||
except: | except: | ||||
log.error(_("Could not rename INBOX folder %s to %s") % (old_name, new_name)) | log.error(_("Could not rename INBOX folder %s to %s") % (old_name, new_name)) | ||||
else: | else: | ||||
log.warning(_("Moving INBOX folder %s won't succeed as target folder %s already exists") % (old_name, new_name)) | log.warning(_("Moving INBOX folder %s won't succeed as target folder %s already exists") % (old_name, new_name)) | ||||
▲ Show 20 Lines • Show All 395 Lines • Show Last 20 Lines |
test for object identity should be 'is not'