Page MenuHomePhorge

No OneTemporary

Authored By
Unknown
Size
40 KB
Referenced Files
None
Subscribers
None
diff --git a/pykolab/imap/__init__.py b/pykolab/imap/__init__.py
index 056b916..003db79 100644
--- a/pykolab/imap/__init__.py
+++ b/pykolab/imap/__init__.py
@@ -1,554 +1,568 @@
# -*- coding: utf-8 -*-
# Copyright 2010-2011 Kolab Systems AG (http://www.kolabsys.com)
#
# Jeroen van Meeuwen (Kolab Systems) <vanmeeuwen a kolabsys.com>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; version 3 or, at your option, any later version
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Library General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
import logging
import re
import time
import sys
from urlparse import urlparse
import pykolab
from pykolab.translate import _
log = pykolab.getLogger('pykolab.imap')
conf = pykolab.getConf()
auth = pykolab.auth
class IMAP(object):
def __init__(self):
# Pool of named IMAP connections, by hostname
self._imap = {}
# Place holder for the current IMAP connection
self.imap = None
self.users = []
self.inbox_folders = []
def connect(self, uri=None, login=True):
backend = conf.get('kolab', 'imap_backend')
if uri == None:
uri = conf.get(backend, 'uri')
result = urlparse(uri)
if hasattr(result, 'hostname'):
hostname = result.hostname
else:
scheme = uri.split(':')[0]
(hostname, port) = uri.split('/')[2].split(':')
# Get the credentials
admin_login = conf.get(backend, 'admin_login')
admin_password = conf.get(backend, 'admin_password')
if not self._imap.has_key(hostname):
if backend == 'cyrus-imap':
import cyrus
self._imap[hostname] = cyrus.Cyrus(uri)
# Actually connect
if login:
log.debug(_("Logging on to Cyrus IMAP server %s") %(hostname), level=8)
self._imap[hostname].login(admin_login, admin_password)
elif backend == 'dovecot':
import dovecot
self._imap[hostname] = dovecot.Dovecot(uri)
# Actually connect
if login:
log.debug(_("Logging on to Dovecot IMAP server %s") %(hostname), level=8)
self._imap[hostname].login(admin_login, admin_password)
else:
import imap
self._imap[hostname] = imap.IMAP(uri)
# Actually connect
if login:
log.debug(_("Logging on to generic IMAP server %s") %(hostname), level=8)
self._imap[hostname].login(admin_login, admin_password)
else:
log.debug(_("Reusing existing IMAP server connection to %s") %(hostname), level=8)
# Set the newly created technology specific IMAP library as the current
# IMAP connection to be used.
self.imap = self._imap[hostname]
def disconnect(self, server=None):
if server == None:
# No server specified, but make sure self.imap is None anyways
self.imap = None
else:
if self._imap.has_key(server):
del self._imap[server]
else:
log.warning(_("Called imap.disconnect() on a server that " + \
"we had no connection to"))
def __getattr__(self, name):
if hasattr(self.imap, name):
return getattr(self.imap, name)
else:
raise AttributeError, _("%r has no attribute %s") %(self,name)
def has_folder(self, folder):
folders = self.imap.lm(folder)
log.debug(_("Looking for folder '%s', we found folders: %r") %(folder,folders), level=8)
# Greater then one, this folder may have subfolders.
if len(folders) > 0:
return True
else:
return False
def move_user_folders(self, users=[]):
for user in users:
if type(user) == dict:
if user.has_key('old_mail'):
inbox = "user/%s" %(user['mail'])
old_inbox = "user/%s" %(user['old_mail'])
if self.has_folder(old_inbox):
log.debug(_("Found old INBOX folder %s") %(old_inbox), level=8)
if not self.has_folder(inbox):
+ log.info(_("Renaming INBOX from %s to %s") %(old_inbox,inbox))
self.imap.rename(old_inbox,inbox)
self.inbox_folders.append(inbox)
else:
log.warning(_("Moving INBOX folder %s won't succeed as target folder %s already exists") %(old_inbox,inbox))
else:
log.debug(_("Did not find old folder user/%s to rename") %(user['old_mail']), level=8)
else:
log.debug(_("Value for user is not a dictionary"), level=8)
def create_user_folders(self, users, primary_domain, secondary_domains):
inbox_folders = []
domain_section = auth.domain_section(primary_domain)
folders = self.list_user_folders(primary_domain, secondary_domains)
# See if the folder belongs to any of the users
_match_attr = conf.get('cyrus-sasl', 'result_attribute')
if not users:
users = auth.list_users(primary_domain)
for user in users:
if type(user) == dict:
if user.has_key(_match_attr):
inbox_folders.append(user[_match_attr].lower())
+ else:
+ # If the user passed on to this function does not have
+ # a key for _match_attr, then we have to bail out and
+ # continue
+ continue
+
elif type(user) == str:
inbox_folders.append(user.lower())
for folder in inbox_folders:
additional_folders = None
try:
if folders.index(folder) > -1:
continue
else:
# TODO: Perhaps this block is moot
log.info(_("Creating new INBOX for user (%d): %s")
%(1,folder))
try:
self.imap.cm("user/%s" %(folder))
except:
log.warning(_("Mailbox already exists: user/%s")
%(folder))
continue
if conf.get('kolab', 'imap_backend') == 'cyrus-imap':
self.imap._setquota("user/%s" %(folder),0)
except:
# TODO: Perhaps this block is moot
log.info(_("Creating new INBOX for user (%d): %s") %(2,folder))
try:
self.imap.cm("user/%s" %(folder))
except:
log.warning(_("Mailbox already exists: user/%s") %(folder))
continue
self.imap._setquota("user/%s" %(folder),0)
if conf.has_option(domain_section, "autocreate_folders"):
_additional_folders = conf.get_raw(domain_section, "autocreate_folders")
additional_folders = conf.plugins.exec_hook("create_user_folders",
kw={
'folder': folder,
'additional_folders': _additional_folders
}
)
if not additional_folders == None:
self.create_user_additional_folders(folder, additional_folders)
return inbox_folders
def create_user_additional_folders(self, folder, additional_folders):
self.connect()
for additional_folder in additional_folders.keys():
_add_folder = {}
if len(folder.split('@')) > 1:
folder_name = "user%(seperator)s%(username)s%(seperator)s%(additional_folder_name)s@%(domainname)s"
_add_folder['username'] = folder.split('@')[0]
_add_folder['domainname'] = folder.split('@')[1]
_add_folder['additional_folder_name'] = additional_folder
_add_folder['seperator'] = self.imap.seperator
folder_name = folder_name % _add_folder
else:
folder_name = "user%(seperator)s%(username)s%(seperator)s%(additional_folder_name)s" % {
"username": folder,
"seperator": self.imap.seperator,
"additional_folder_name": additional_folder
}
try:
self.imap.cm(folder_name)
except:
log.warning(_("Mailbox already exists: user/%s") %(folder))
if additional_folders[additional_folder].has_key("annotations"):
for annotation in additional_folders[additional_folder]["annotations"].keys():
if conf.get('kolab', 'imap_backend') == 'cyrus-imap':
self.imap._setannotation(
folder_name,
"%s" %(annotation),
"%s" %(additional_folders[additional_folder]["annotations"][annotation])
)
if additional_folders[additional_folder].has_key("acls"):
for acl in additional_folders[additional_folder]["acls"].keys():
self.imap.sam(
folder_name,
"%s" %(acl),
"%s" %(additional_folders[additional_folder]["acls"][acl])
)
def set_user_folder_quota(self, users=[], primary_domain=None, secondary_domain=[], folders=[]):
+ """
+
+ Sets the quota in IMAP using the authentication and authorization
+ database 'quota' attribute for the users listed in parameter 'users'
+ """
+
self.connect()
if conf.has_option(primary_domain, 'quota_attribute'):
_quota_attr = conf.get(primary_domain, 'quota_attribute')
else:
auth_mechanism = conf.get('kolab', 'auth_mechanism')
_quota_attr = conf.get(auth_mechanism, 'quota_attribute')
_inbox_folder_attr = conf.get('cyrus-sasl', 'result_attribute')
default_quota = auth.domain_default_quota(primary_domain)
- if default_quota == "":
+ if default_quota == "" or default_quota == None:
default_quota = 0
if len(users) == 0:
users = auth.list_users(primary_domain)
for user in users:
quota = None
if type(user) == dict:
if user.has_key(_quota_attr):
- #print "user has key"
if type(user[_quota_attr]) == list:
quota = user[_quota_attr].pop(0)
elif type(user[_quota_attr]) == str:
quota = user[_quota_attr]
else:
_quota = auth.get_user_attribute(primary_domain, user, _quota_attr)
if _quota == None:
quota = 0
else:
quota = _quota
if not user.has_key(_inbox_folder_attr):
continue
else:
if type(user[_inbox_folder_attr]) == list:
folder = "user/%s" % user[_inbox_folder_attr].pop(0)
elif type(user[_inbox_folder_attr]) == str:
folder = "user/%s" % user[_inbox_folder_attr]
elif type(user) == str:
quota = auth.get_user_attribute(user, 'quota')
folder = "user/%s" %(user)
folder = folder.lower()
try:
(used,current_quota) = self.imap.lq(folder)
except:
# TODO: Go in fact correct the quota.
log.warning(_("Cannot get current IMAP quota for folder %s") %(folder))
- continue
+ used = 0
+ current_quota = 0
new_quota = conf.plugins.exec_hook("set_user_folder_quota", kw={
'used': used,
'current_quota': current_quota,
- 'new_quota': int(quota),
- 'default_quota': default_quota
+ 'new_quota': (int)(quota),
+ 'default_quota': (int)(default_quota)
}
)
log.debug(_("Quota for %s currently is %s") %(folder, current_quota), level=7)
if new_quota == None:
continue
if not int(new_quota) == int(quota):
log.info(_("Adjusting authentication database quota for folder %s to %d") %(folder,int(new_quota)))
quota = int(new_quota)
auth.set_user_attribute(primary_domain, user, _quota_attr, new_quota)
if not int(current_quota) == int(quota):
- #log.info(_("Correcting quota for %s to %s (currently %s)") %(folder, quota, current_quota))
+ log.info(_("Correcting quota for %s to %s (currently %s)") %(folder, quota, current_quota))
self.imap._setquota(folder, quota)
def set_user_mailhost(self, users=[], primary_domain=None, secondary_domain=[], folders=[]):
self.connect()
if conf.has_option(primary_domain, 'mailserver_attribute'):
_mailserver_attr = conf.get(primary_domain, 'mailserver_attribute')
else:
auth_mechanism = conf.get('kolab', 'auth_mechanism')
_mailserver_attr = conf.get(auth_mechanism, 'mailserver_attribute')
_inbox_folder_attr = conf.get('cyrus-sasl', 'result_attribute')
if len(users) == 0:
users = auth.list_users(primary_domain)
for user in users:
mailhost = None
if type(user) == dict:
if user.has_key(_mailserver_attr):
- #print "user has key"
if type(user[_mailserver_attr]) == list:
_mailserver = user[_mailserver_attr].pop(0)
elif type(user[_mailserver_attr]) == str:
_mailserver = user[_mailserver_attr]
else:
_mailserver = auth.get_user_attribute(primary_domain, user, _mailserver_attr)
if not user.has_key(_inbox_folder_attr):
continue
else:
if type(user[_inbox_folder_attr]) == list:
folder = "user/%s" % user[_inbox_folder_attr].pop(0)
elif type(user[_inbox_folder_attr]) == str:
folder = "user/%s" % user[_inbox_folder_attr]
elif type(user) == str:
_mailserver = auth.get_user_attribute(user, _mailserver_attr)
folder = "user/%s" %(user)
folder = folder.lower()
- _current_mailserver = self.imap.find_mailbox_server(folder)
+ _current_mailserver = self.imap.find_mailfolder_server(folder)
if not _mailserver == None:
# TODO:
if not _current_mailserver == _mailserver:
self.imap._xfer(folder, _current_mailserver, _mailserver)
-
- auth.set_user_attribute(primary_domain, user, _mailserver_attr, _mailserver)
else:
auth.set_user_attribute(primary_domain, user, _mailserver_attr, _current_mailserver)
- def parse_mailbox(self, mailbox):
+ def parse_mailfolder(self, mailfolder):
self.connect()
- return self.imap.parse_mailbox(mailbox)
+ return self.imap.parse_mailfolder(mailfolder)
def expunge_user_folders(self, inbox_folders=None):
"""
Delete folders that have no equivalent user qualifier in the list
of users passed to this function, ...
TODO: Explain the domain parameter, and actually get it to work
properly. This also relates to threading for multi-domain
deployments.
Parameters:
users
A list of users. Can be a list of user qualifiers, e.g.
[ 'user1', 'user2' ] or a list of user attribute
dictionaries, e.g. [ { 'user1': { 'attr': 'value' } } ]
primary_domain, secondary_domains
"""
self.connect()
if inbox_folders == None:
inbox_folders = []
folders = self.list_user_folders()
for folder in folders:
log.debug(_("Checking folder: %s") %(folder), level=1)
try:
if inbox_folders.index(folder) > -1:
continue
else:
log.info(_("Folder has no corresponding user (1): %s") %(folder))
- self.delete("user/%s" %(folder))
+ self.delete_mailfolder("user/%s" %(folder))
except:
log.info(_("Folder has no corresponding user (2): %s") %(folder))
try:
- self.delete("user/%s" %(folder))
+ self.delete_mailfolder("user/%s" %(folder))
except:
pass
- def delete(self, mailbox_path):
- mbox_parts = self.parse_mailbox(mailbox_path)
+ def delete_mailfolder(self, mailfolder_path):
+ """
+ Deletes a mail folder described by mailfolder_path.
+ """
+
+ mbox_parts = self.parse_mailfolder(mailfolder_path)
if mbox_parts == None:
# We got user identifier only
log.error(_("Please don't give us just a user identifier"))
return
- #self.imap.dm(mailbox_path)
+ self.imap.dm(mailfolder_path)
clean_acls = False
section = False
+
if mbox_parts['domain']:
if conf.has_option(mbox_parts['domain'], 'delete_clean_acls'):
section = mbox_parts['domain']
elif conf.has_option('kolab', 'delete_clean_acls'):
section = 'kolab'
elif conf.has_option('kolab', 'delete_clean_acls'):
section = 'kolab'
if not section == False:
clean_acls = conf.get(section, 'delete_clean_acls')
if not clean_acls == False and not clean_acls == 0:
log.info(_("Cleaning up ACL entries across all folders"))
if mbox_parts['domain']:
# List the shared and user folders
shared_folders = self.imap.lm(
"shared/*@%s" %(mbox_parts['domain'])
)
user_folders = self.imap.lm(
"user/*@%s" %(mbox_parts['domain'])
)
aci_identifier = "%s@%s" %(
mbox_parts['path_parts'][1],
mbox_parts['domain']
)
else:
shared_folders = self.imap.lm("shared/*")
user_folders = self.imap.lm("user/*")
aci_identifier = "%s" %(mbox_parts['path_parts'][1])
log.debug(
_("Cleaning up ACL entries referring to identifier %s") %(
aci_identifier
),
level=5
)
# For all folders (shared and user), ...
folders = user_folders + shared_folders
log.debug(_("Iterating over %d folders") %(len(folders)), level=5)
# ... loop through them and ...
for folder in folders:
# ... list the ACL entries
acls = self.imap.lam(folder)
# For each ACL entry, see if we think it is a current, valid entry
for acl_entry in acls.keys():
# If the key 'acl_entry' does not exist in the dictionary of valid
# ACL entries, this ACL entry has got to go.
if acl_entry == aci_identifier:
# Set the ACL to '' (effectively deleting the ACL entry)
self.imap.sam(folder, acl_entry, '')
def list_user_folders(self, primary_domain=None, secondary_domains=[]):
"""
List the INBOX folders in the IMAP backend. Returns a list of unique
base folder names.
"""
self.connect()
_folders = self.imap.lm("user/%")
# TODO: Replace the .* below with a regex representing acceptable DNS
# domain names.
domain_re = ".*\.?%s$"
acceptable_domain_name_res = []
if not primary_domain == None:
for domain in [ primary_domain ] + secondary_domains:
acceptable_domain_name_res.append(domain_re %(domain))
folders = []
for folder in _folders:
folder_name = None
if len(folder.split('@')) > 1:
+ # TODO: acceptable domain name spaces
#acceptable = False
#for domain_name_re in acceptable_domain_name_res:
#prog = re.compile(domain_name_re)
#if prog.match(folder.split('@')[1]):
#print "Acceptable indeed"
#acceptable = True
#if not acceptable:
#print "%s is not acceptable against %s yet using %s" %(folder.split('@')[1],folder,domain_name_re)
#if acceptable:
#folder_name = "%s@%s" %(folder.split(self.seperator)[1].split('@')[0],folder.split('@')[1])
folder_name = "%s@%s" %(folder.split(self.imap.seperator)[1].split('@')[0],folder.split('@')[1])
else:
folder_name = "%s" %(folder.split(self.imap.seperator)[1])
if not folder_name == None:
if not folder_name in folders:
folders.append(folder_name)
- #print folders
-
return folders
def synchronize(self, users=[], primary_domain=None, secondary_domains=[]):
self.connect()
self.users.extend(users)
self.move_user_folders(users)
self.inbox_folders.extend(self.create_user_folders(users, primary_domain, secondary_domains))
self.set_user_folder_quota(users, primary_domain, secondary_domains, self.inbox_folders)
self.set_user_mailhost(users, primary_domain, secondary_domains, self.inbox_folders)
def lm(self, *args, **kw):
return self.imap.lm(*args, **kw)
- def undelete(self, *args, **kw):
- self.imap.undelete(*args, **kw)
+ def undelete_mailfolder(self, *args, **kw):
+ self.imap.undelete_mailfolder(*args, **kw)
diff --git a/pykolab/imap/cyrus.py b/pykolab/imap/cyrus.py
index ac864aa..c0f8c04 100644
--- a/pykolab/imap/cyrus.py
+++ b/pykolab/imap/cyrus.py
@@ -1,365 +1,365 @@
# -*- coding: utf-8 -*-
# Copyright 2010-2011 Kolab Systems AG (http://www.kolabsys.com)
#
# Jeroen van Meeuwen (Kolab Systems) <vanmeeuwen a kolabsys.com>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; version 3 or, at your option, any later version
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Library General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
import cyruslib
import time
from urlparse import urlparse
import pykolab
from pykolab.translate import _
+log = pykolab.getLogger('pykolab.imap')
conf = pykolab.getConf()
-log = pykolab.getLogger('pykolab.imap.cyrus')
imap = pykolab.imap
class Cyrus(cyruslib.CYRUS):
"""
Abstraction class for some common actions to do exclusively in Cyrus.
For example, the following functions require the commands to be
executed against the backend server if a murder is being used.
- Setting quota
- Renaming the top-level mailbox
- Setting annotations
"""
setquota = cyruslib.CYRUS.sq
def __init__(self, uri):
"""
Initialize this class, but do not connect yet.
"""
port = None
result = urlparse(uri)
if hasattr(result, 'hostname'):
scheme = result.scheme
hostname = result.hostname
port = result.port
else:
scheme = uri.split(':')[0]
(hostname, port) = uri.split('/')[2].split(':')
if not port:
if scheme == 'imap':
port = 143
else:
port = 993
self.server = hostname
self.uri = "%s://%s:%s" %(scheme,hostname,port)
cyruslib.CYRUS.__init__(self, self.uri)
if conf.debuglevel > 8:
self.VERBOSE = True
# Initialize our variables
self.seperator = self.SEP
# Placeholder for known mailboxes on known servers
self.mbox = {}
def __del__(self):
pass
def login(self, *args, **kw):
"""
Login to the Cyrus IMAP server through cyruslib.CYRUS, but set our
hierarchy seperator.
"""
cyruslib.CYRUS.login(self, *args, **kw)
self.seperator = self.SEP
self.murder = False
for capability in self.m.capabilities:
if capability.startswith("MUPDATE="):
log.debug(_("Detected we are running in a Murder topology"), level=8)
self.murder = True
if not self.murder:
log.debug(_("This system is not part of a murder topology"), level=8)
- def find_mailbox_server(self, mailbox):
+ def find_mailfolder_server(self, mailfolder):
annotations = {}
- _mailbox = self.parse_mailbox(mailbox)
- prefix = _mailbox['path_parts'].pop(0)
- mbox = _mailbox['path_parts'].pop(0)
- if not _mailbox['domain'] == None:
- mailbox = "%s%s%s@%s" %(prefix,self.seperator,mbox,_mailbox['domain'])
+ _mailfolder = self.parse_mailfolder(mailfolder)
+ prefix = _mailfolder['path_parts'].pop(0)
+ mbox = _mailfolder['path_parts'].pop(0)
+ if not _mailfolder['domain'] == None:
+ mailfolder = "%s%s%s@%s" %(prefix,self.seperator,mbox,_mailfolder['domain'])
# TODO: Workaround for undelete
- if len(self.lm(mailbox)) < 1:
+ if len(self.lm(mailfolder)) < 1:
return self.server
if not self.murder:
return self.server
- log.debug(_("Checking actual backend server for folder %s through annotations") %(mailbox), level=8)
- if self.mbox.has_key(mailbox):
- return self.mbox[mailbox]
+ log.debug(_("Checking actual backend server for folder %s through annotations") %(mailfolder), level=8)
+ if self.mbox.has_key(mailfolder):
+ return self.mbox[mailfolder]
max_tries = 20
num_try = 0
while 1:
num_try += 1
- annotations = self._getannotation(mailbox, "/vendor/cmu/cyrus-imapd/server")
+ annotations = self._getannotation(mailfolder, "/vendor/cmu/cyrus-imapd/server")
- if annotations.has_key(mailbox):
+ if annotations.has_key(mailfolder):
break
if max_tries <= num_try:
log.error(_("Could not get the annotations after %s tries.") %(num_try))
- annotations = { mailbox: { '/vendor/cmu/cyrus-imapd/server': self.server }}
+ annotations = { mailfolder: { '/vendor/cmu/cyrus-imapd/server': self.server }}
break
- log.warning(_("No annotations for %s: %r") %(mailbox,annotations))
+ log.warning(_("No annotations for %s: %r") %(mailfolder,annotations))
time.sleep(1)
- server = annotations[mailbox]['/vendor/cmu/cyrus-imapd/server']
- self.mbox[mailbox] = server
+ server = annotations[mailfolder]['/vendor/cmu/cyrus-imapd/server']
+ self.mbox[mailfolder] = server
if not server == self.server:
if imap._imap.has_key(server):
- if not imap._imap[server].mbox.has_key(mailbox):
- imap._imap[server].mbox[mailbox] = server
+ if not imap._imap[server].mbox.has_key(mailfolder):
+ imap._imap[server].mbox[mailfolder] = server
- log.debug(_("Server for INBOX folder %s is %s") %(mailbox,server), level=8)
+ log.debug(_("Server for INBOX folder %s is %s") %(mailfolder,server), level=8)
return server
- def _setquota(self, mailbox, quota):
+ def _setquota(self, mailfolder, quota):
"""
Login to the actual backend server.
"""
- server = self.find_mailbox_server(mailbox)
+ server = self.find_mailfolder_server(mailfolder)
imap.connect('imap://%s:143' %(server))
- log.debug(_("Setting quota for INBOX folder %s to %s") %(mailbox,quota), level=8)
+ log.debug(_("Setting quota for INBOX folder %s to %s") %(mailfolder,quota), level=8)
try:
- imap.setquota(mailbox, quota)
+ imap.setquota(mailfolder, quota)
except:
- log.error(_("Could not set quota for mailbox %s") %(mailbox))
+ log.error(_("Could not set quota for mailfolder %s") %(mailfolder))
- def _rename(self, from_mailbox, to_mailbox, partition=None):
+ def _rename(self, from_mailfolder, to_mailfolder, partition=None):
"""
Login to the actual backend server, then rename.
"""
- server = self.find_mailbox_server(from_mailbox)
+ server = self.find_mailfolder_server(from_mailfolder)
imap.connect('imap://%s:143' %(server))
- log.debug(_("Moving INBOX folder %s to %s") %(from_mailbox,to_mailbox), level=8)
- imap.rename(from_mailbox, to_mailbox, partition)
+ log.debug(_("Moving INBOX folder %s to %s") %(from_mailfolder,to_mailfolder), level=8)
+ imap.rename(from_mailfolder, to_mailfolder, partition)
def _getannotation(self, *args, **kw):
imap.connect()
return imap.getannotation(*args, **kw)
- def _setannotation(self, mailbox, annotation, value):
+ def _setannotation(self, mailfolder, annotation, value):
"""
Login to the actual backend server, then set annotation.
"""
- server = self.find_mailbox_server(mailbox)
+ server = self.find_mailfolder_server(mailfolder)
imap.connect('imap://%s:143' %(server))
- log.debug(_("Setting annotation %s on folder %s") %(annotation,mailbox), level=8)
- imap.setannotation(mailbox, annotation, value)
+ log.debug(_("Setting annotation %s on folder %s") %(annotation,mailfolder), level=8)
+ imap.setannotation(mailfolder, annotation, value)
- def _xfer(self, mailbox, current_server, new_server):
+ def _xfer(self, mailfolder, current_server, new_server):
imap.connect('imap://%s:143' %(current_server))
- log.debug(_("Transferring folder %s from %s to %s") %(mailbox, current_server, new_server), level=8)
- imap.xfer(mailbox, new_server)
+ log.debug(_("Transferring folder %s from %s to %s") %(mailfolder, current_server, new_server), level=8)
+ imap.xfer(mailfolder, new_server)
- def undelete(self, mailbox, to_mailbox=None, recursive=True):
+ def undelete_mailfolder(self, mailfolder, to_mailfolder=None, recursive=True):
"""
- Login to the actual backend server, then "undelete" the mailbox.
+ Login to the actual backend server, then "undelete" the mailfolder.
- 'mailbox' may be a string representing either of the following two
+ 'mailfolder' may be a string representing either of the following two
options;
- the fully qualified pathof the deleted folder in its current
location, such as, for a deleted INBOX folder originally known as
"user/userid[@domain]";
"DELETED/user/userid/hex[@domain]"
- the original folder name, such as;
"user/userid[@domain]"
- 'to_mailbox' may be the target folder to "undelete" the deleted
+ 'to_mailfolder' may be the target folder to "undelete" the deleted
folder to. If not specified, the original folder name is used.
"""
# Placeholder for folders we have recovered already.
target_folders = []
- mailbox = self.parse_mailbox(mailbox)
+ mailfolder = self.parse_mailfolder(mailfolder)
- undelete_folders = self._find_deleted_folder(mailbox)
+ undelete_folders = self._find_deleted_folder(mailfolder)
- if not to_mailbox == None:
- target_mbox = self.parse_mailbox(to_mailbox)
+ if not to_mailfolder == None:
+ target_mbox = self.parse_mailfolder(to_mailfolder)
else:
- target_mbox = mailbox
+ target_mbox = mailfolder
for undelete_folder in undelete_folders:
- undelete_mbox = self.parse_mailbox(undelete_folder)
+ undelete_mbox = self.parse_mailfolder(undelete_folder)
prefix = undelete_mbox['path_parts'].pop(0)
mbox = undelete_mbox['path_parts'].pop(0)
- if to_mailbox == None:
+ if to_mailfolder == None:
target_folder = self.seperator.join([prefix,mbox])
else:
target_folder = self.seperator.join(target_mbox['path_parts'])
- if not to_mailbox == None:
+ if not to_mailfolder == None:
target_folder = "%s%s%s" %(target_folder,self.seperator,mbox)
if not len(undelete_mbox['path_parts']) == 0:
target_folder = "%s%s%s" %(target_folder,self.seperator,self.seperator.join(undelete_mbox['path_parts']))
if target_folder in target_folders:
target_folder = "%s%s%s" %(target_folder,self.seperator,undelete_mbox['hex_timestamp'])
target_folders.append(target_folder)
if not target_mbox['domain'] == None:
target_folder = "%s@%s" %(target_folder,target_mbox['domain'])
log.info(_("Undeleting %s to %s") %(undelete_folder,target_folder))
- target_server = self.find_mailbox_server(target_folder)
+ target_server = self.find_mailfolder_server(target_folder)
if not target_server == self.server:
self.xfer(undelete_folder,target_server)
self.rename(undelete_folder,target_folder)
- def parse_mailbox(self, mailbox):
+ def parse_mailfolder(self, mailfolder):
"""
- Parse a mailbox name to it's parts.
+ Parse a mailfolder name to it's parts.
- Takes a fully qualified mailbox or mailbox sub-folder.
+ Takes a fully qualified mailfolder or mailfolder sub-folder.
"""
mbox = {
'domain': None
}
# Split off the virtual domain identifier, if any
- if len(mailbox.split('@')) > 1:
- mbox['domain'] = mailbox.split('@')[1]
- mbox['path_parts'] = mailbox.split('@')[0].split(self.seperator)
+ if len(mailfolder.split('@')) > 1:
+ mbox['domain'] = mailfolder.split('@')[1]
+ mbox['path_parts'] = mailfolder.split('@')[0].split(self.seperator)
else:
- mbox['path_parts'] = mailbox.split(self.seperator)
+ mbox['path_parts'] = mailfolder.split(self.seperator)
# See if the path that has been specified is the current location for
# the deleted folder, or the original location, we have to find the deleted
# folder for.
if not mbox['path_parts'][0] in [ 'user', 'shared' ]:
deleted_prefix = mbox['path_parts'].pop(0)
# See if the hexadecimal timestamp is actually hexadecimal.
# This prevents "DELETED/user/userid/Sent", but not
# "DELETED/user/userid/FFFFFF" from being specified.
try:
epoch = int(mbox['path_parts'][(len(mbox['path_parts'])-1)], 16)
try:
timestamp = time.asctime(time.gmtime(epoch))
except:
return None
except:
return None
# Verify that the input for the deleted folder is actually a
# deleted folder.
- verify_folder_search = "%(dp)s%(sep)s%(mailbox)s" % {
+ verify_folder_search = "%(dp)s%(sep)s%(mailfolder)s" % {
'dp': deleted_prefix,
'sep': self.seperator,
- 'mailbox': self.seperator.join(mbox['path_parts'])
+ 'mailfolder': self.seperator.join(mbox['path_parts'])
}
if not mbox['domain'] == None:
verify_folder_search = "%s@%s" %(verify_folder_search, mbox['domain'])
folders = self.lm(verify_folder_search)
# NOTE: Case also covered is valid hexadecimal folders; won't be the
# actual check as intended, but doesn't give you anyone else's data
# unless... See the following:
#
# TODO: Case not covered is usernames that are hexadecimal.
#
# We could probably attempt to convert the int(hex) into a time.gmtime(),
# but it still would not cover all cases.
#
# If no folders where found... well... then there you go.
if len(folders) < 1:
return None
# Pop off the hex timestamp, which turned out to be valid
mbox['hex_timestamp'] = mbox['path_parts'].pop()
return mbox
def _find_deleted_folder(self, mbox):
"""
- Give me the parts that are in an original mailbox name and I'll find
+ Give me the parts that are in an original mailfolder name and I'll find
the deleted folder name.
TODO: It finds virtdomain folders for non-virtdomain searches.
"""
- deleted_folder_search = "%(deleted_prefix)s%(seperator)s%(mailbox)s%(seperator)s*" % {
+ deleted_folder_search = "%(deleted_prefix)s%(seperator)s%(mailfolder)s%(seperator)s*" % {
# TODO: The prefix used is configurable
'deleted_prefix': "DELETED",
- 'mailbox': self.seperator.join(mbox['path_parts']),
+ 'mailfolder': self.seperator.join(mbox['path_parts']),
'seperator': self.seperator,
}
if not mbox['domain'] == None:
deleted_folder_search = "%s@%s" %(deleted_folder_search,mbox['domain'])
folders = self.lm(deleted_folder_search)
# The folders we have found at this stage include virtdomain folders.
#
# For example, having searched for user/userid, it will also find
# user/userid@example.org
#
# Here, we explicitely remove any virtdomain folders.
if mbox['domain'] == None:
_folders = []
for folder in folders:
if len(folder.split('@')) < 2:
_folders.append(folder)
folders = _folders
return folders

File Metadata

Mime Type
text/x-diff
Expires
Sun, Apr 5, 11:29 PM (1 w, 5 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
18815427
Default Alt Text
(40 KB)

Event Timeline