Page MenuHomePhorge

No OneTemporary

Authored By
Unknown
Size
20 KB
Referenced Files
None
Subscribers
None
diff --git a/pykolab/imap/cyrus.py b/pykolab/imap/cyrus.py
index b5f42cf..7807af0 100644
--- a/pykolab/imap/cyrus.py
+++ b/pykolab/imap/cyrus.py
@@ -1,588 +1,597 @@
# -*- coding: utf-8 -*-
# Copyright 2010-2013 Kolab Systems AG (http://www.kolabsys.com)
#
# Jeroen van Meeuwen (Kolab Systems) <vanmeeuwen a kolabsys.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import cyruslib
import sys
import time
from urlparse import urlparse
import pykolab
from pykolab.imap import IMAP
from pykolab.translate import _
log = pykolab.getLogger('pykolab.imap')
conf = pykolab.getConf()
+
class Cyrus(cyruslib.CYRUS):
"""
Abstraction class for some common actions to do exclusively in Cyrus.
For example, the following functions require the commands to be
executed against the backend server if a murder is being used.
- Setting quota
- Renaming the top-level mailbox
- Setting annotations
"""
setquota = cyruslib.CYRUS.sq
def __init__(self, uri):
"""
Initialize this class, but do not connect yet.
"""
port = None
result = urlparse(uri)
if hasattr(result, 'hostname'):
scheme = result.scheme
hostname = result.hostname
port = result.port
else:
scheme = uri.split(':')[0]
(hostname, port) = uri.split('/')[2].split(':')
if not port:
if scheme == 'imap':
port = 143
else:
port = 993
self.server = hostname
self.uri = "%s://%s:%s" % (scheme, hostname, port)
while 1:
try:
cyruslib.CYRUS.__init__(self, self.uri)
break
except cyruslib.CYRUSError:
log.warning(
_("Could not connect to Cyrus IMAP server %r") % (
self.uri
)
)
time.sleep(10)
if conf.debuglevel > 8:
self.VERBOSE = True
self.m.debug = 5
# Initialize our variables
self.separator = self.SEP
# Placeholder for known mailboxes on known servers
self.mbox = {}
def __del__(self):
pass
def connect(self, uri):
"""
Dummy connect function that checks if the server that we want to
connect to is actually the server we are connected to.
Uses pykolab.imap.IMAP.connect() in the background.
"""
port = None
result = urlparse(uri)
if hasattr(result, 'hostname'):
scheme = result.scheme
hostname = result.hostname
port = result.port
else:
scheme = uri.split(':')[0]
(hostname, port) = uri.split('/')[2].split(':')
if not port:
if scheme == 'imap':
port = 143
else:
port = 993
if hostname == self.server:
return
imap = IMAP()
imap.connect(uri=uri)
if not self.SEP == self.separator:
self.separator = self.SEP
def login(self, *args, **kw):
"""
Login to the Cyrus IMAP server through cyruslib.CYRUS, but set our
hierarchy separator.
"""
cyruslib.CYRUS.login(self, *args, **kw)
self.separator = self.SEP
log.debug(
_("Continuing with separator: %r") % (self.separator),
level=8
)
self.murder = False
for capability in self.m.capabilities:
if capability.startswith("MUPDATE="):
log.debug(
_("Detected we are running in a Murder topology"),
level=8
)
self.murder = True
if not self.murder:
log.debug(
_("This system is not part of a murder topology"),
level=8
)
def find_mailfolder_server(self, mailfolder):
annotations = {}
_mailfolder = self.parse_mailfolder(mailfolder)
prefix = _mailfolder['path_parts'].pop(0)
mbox = _mailfolder['path_parts'].pop(0)
- if not _mailfolder['domain'] == None:
+ if _mailfolder['domain'] is not None:
mailfolder = "%s%s%s@%s" % (
prefix,
self.separator,
mbox,
_mailfolder['domain']
)
# TODO: Workaround for undelete
if len(self.lm(mailfolder)) < 1:
return self.server
# TODO: Murder capabilities may have been suppressed using Cyrus IMAP
# configuration.
if not self.murder:
return self.server
log.debug(
_("Checking actual backend server for folder %s " +
"through annotations") % (
mailfolder
),
level=8
)
if self.mbox.has_key(mailfolder):
log.debug(
_(
"Possibly reproducing the find " +
"mailfolder server answer from " +
"previously detected and stored " +
"annotation value: %r"
) % (
self.mbox[mailfolder]
),
level=8
)
if not self.mbox[mailfolder] == self.server:
return self.mbox[mailfolder]
max_tries = 20
num_try = 0
annotation_path = "/shared/vendor/cmu/cyrus-imapd/server"
while 1:
num_try += 1
annotations = self._getannotation(
mailfolder,
annotation_path
)
if annotations.has_key(mailfolder):
break
if max_tries <= num_try:
log.error(
_("Could not get the annotations after %s tries.") % (
num_try
)
)
annotations = {
mailfolder: {
annotation_path: self.server
}
}
break
log.warning(
_("No annotations for %s: %r") % (
mailfolder,
annotations
)
)
time.sleep(1)
server = annotations[mailfolder][annotation_path]
self.mbox[mailfolder] = server
log.debug(
_("Server for INBOX folder %s is %s") % (
mailfolder,
server
),
level=8
)
return server
def folder_utf7(self, folder):
from pykolab import imap_utf7
return imap_utf7.encode(folder)
def folder_utf8(self, folder):
from pykolab import imap_utf7
return imap_utf7.decode(folder)
def _setquota(self, mailfolder, quota):
"""
Login to the actual backend server.
"""
server = self.find_mailfolder_server(mailfolder)
- self.connect(self.uri.replace(self.server,server))
+ self.connect(self.uri.replace(self.server, server))
log.debug(
_("Setting quota for folder %s to %s") % (
mailfolder,
quota
),
level=8
)
try:
self.m.setquota(mailfolder, quota)
except:
log.error(
_("Could not set quota for mailfolder %s") % (
mailfolder
)
)
def _rename(self, from_mailfolder, to_mailfolder, partition=None):
"""
Login to the actual backend server, then rename.
"""
server = self.find_mailfolder_server(from_mailfolder)
- self.connect(self.uri.replace(self.server,server))
+ self.connect(self.uri.replace(self.server, server))
- if not partition == None:
+ if partition is not None:
log.debug(
_("Moving INBOX folder %s to %s on partition %s") % (
from_mailfolder,
to_mailfolder,
partition
),
level=8
)
else:
log.debug(
_("Moving INBOX folder %s to %s") % (
from_mailfolder,
to_mailfolder
),
level=8
)
self.m.rename(
self.folder_utf7(from_mailfolder),
self.folder_utf7(to_mailfolder),
'"%s"' % (partition)
)
def _getannotation(self, *args, **kw):
return self.getannotation(*args, **kw)
def _setannotation(self, mailfolder, annotation, value, shared=False):
"""
Login to the actual backend server, then set annotation.
"""
try:
server = self.find_mailfolder_server(mailfolder)
except:
server = self.server
log.debug(
_("Setting annotation %s on folder %s") % (
annotation,
mailfolder
),
level=8
)
try:
self.setannotation(mailfolder, annotation, value, shared)
except cyruslib.CYRUSError, errmsg:
log.error(
_("Could not set annotation %r on mail folder %r: %r") % (
annotation,
mailfolder,
errmsg
)
)
def _xfer(self, mailfolder, current_server, new_server):
- self.connect(self.uri.replace(self.server,current_server))
+ self.connect(self.uri.replace(self.server, current_server))
log.debug(
_("Transferring folder %s from %s to %s") % (
mailfolder,
current_server,
new_server
),
level=8
)
self.xfer(mailfolder, new_server)
- def undelete_mailfolder(self, mailfolder, to_mailfolder=None, recursive=True):
+ def undelete_mailfolder(
+ self,
+ mailfolder,
+ to_mailfolder=None,
+ recursive=True
+ ):
"""
Login to the actual backend server, then "undelete" the mailfolder.
- 'mailfolder' may be a string representing either of the following two
- options;
+ 'mailfolder' may be a string representing either of the following
+ two options;
- the fully qualified pathof the deleted folder in its current
location, such as, for a deleted INBOX folder originally known as
"user/userid[@domain]";
"DELETED/user/userid/hex[@domain]"
- the original folder name, such as;
"user/userid[@domain]"
'to_mailfolder' may be the target folder to "undelete" the deleted
folder to. If not specified, the original folder name is used.
"""
# Placeholder for folders we have recovered already.
target_folders = []
mailfolder = self.parse_mailfolder(mailfolder)
undelete_folders = self._find_deleted_folder(mailfolder)
- if not to_mailfolder == None:
+ if to_mailfolder is not None:
target_mbox = self.parse_mailfolder(to_mailfolder)
else:
target_mbox = mailfolder
for undelete_folder in undelete_folders:
undelete_mbox = self.parse_mailfolder(undelete_folder)
prefix = undelete_mbox['path_parts'].pop(0)
mbox = undelete_mbox['path_parts'].pop(0)
- if to_mailfolder == None:
+ if to_mailfolder is None:
target_folder = self.separator.join([prefix, mbox])
else:
target_folder = self.separator.join(target_mbox['path_parts'])
- if not to_mailfolder == None:
+ if to_mailfolder is not None:
target_folder = "%s%s%s" % (
target_folder,
self.separator,
mbox
)
if not len(undelete_mbox['path_parts']) == 0:
target_folder = "%s%s%s" % (
target_folder,
self.separator,
self.separator.join(undelete_mbox['path_parts'])
)
if target_folder in target_folders:
target_folder = "%s%s%s" % (
target_folder,
self.separator,
undelete_mbox['hex_timestamp']
)
target_folders.append(target_folder)
- if not target_mbox['domain'] == None:
- target_folder = "%s@%s" % (target_folder,target_mbox['domain'])
+ if target_mbox['domain'] is not None:
+ target_folder = "%s@%s" % (
+ target_folder,
+ target_mbox['domain']
+ )
log.info(
_("Undeleting %s to %s") % (
undelete_folder,
target_folder
)
)
target_server = self.find_mailfolder_server(target_folder)
- if hasattr(conf,'dry_run') and not conf.dry_run:
+ if hasattr(conf, 'dry_run') and not conf.dry_run:
if not target_server == self.server:
- self.xfer(undelete_folder,target_server)
+ self.xfer(undelete_folder, target_server)
- self.rename(undelete_folder,target_folder)
+ self.rename(undelete_folder, target_folder)
else:
if not target_server == self.server:
print >> sys.stdout, \
_("Would have transferred %s from %s to %s") % (
undelete_folder,
self.server,
target_server
)
print >> sys.stdout, \
_("Would have renamed %s to %s") % (
undelete_folder,
target_folder
)
def parse_mailfolder(self, mailfolder):
"""
Parse a mailfolder name to it's parts.
Takes a fully qualified mailfolder or mailfolder sub-folder.
"""
mbox = {
'domain': None
}
if len(mailfolder.split('/')) > 1:
self.separator = '/'
# Split off the virtual domain identifier, if any
if len(mailfolder.split('@')) > 1:
mbox['domain'] = mailfolder.split('@')[1]
mbox['path_parts'] = mailfolder.split('@')[0].split(self.separator)
else:
mbox['path_parts'] = mailfolder.split(self.separator)
# See if the path that has been specified is the current location for
- # the deleted folder, or the original location, we have to find the deleted
- # folder for.
- if not mbox['path_parts'][0] in [ 'user', 'shared' ]:
+ # the deleted folder, or the original location, we have to find the
+ # deleted folder for.
+ if not mbox['path_parts'][0] in ['user', 'shared']:
deleted_prefix = mbox['path_parts'].pop(0)
# See if the hexadecimal timestamp is actually hexadecimal.
# This prevents "DELETED/user/userid/Sent", but not
# "DELETED/user/userid/FFFFFF" from being specified.
try:
hexstamp = mbox['path_parts'][(len(mbox['path_parts'])-1)]
epoch = int(hexstamp, 16)
try:
timestamp = time.asctime(time.gmtime(epoch))
except:
return None
except:
return None
# Verify that the input for the deleted folder is actually a
# deleted folder.
verify_folder_search = "%(dp)s%(sep)s%(mailfolder)s" % {
'dp': deleted_prefix,
'sep': self.separator,
'mailfolder': self.separator.join(mbox['path_parts'])
}
- if not mbox['domain'] == None:
+ if mbox['domain'] is not None:
verify_folder_search = "%s@%s" % (
verify_folder_search,
mbox['domain']
)
if ' ' in verify_folder_search:
folders = self.lm(
'"%s"' % self.folder_utf7(verify_folder_search)
)
else:
folders = self.lm(self.folder_utf7(verify_folder_search))
# NOTE: Case also covered is valid hexadecimal folders; won't be
# the actual check as intended, but doesn't give you anyone else's
# data unless... See the following:
#
# TODO: Case not covered is usernames that are hexadecimal.
#
# We could probably attempt to convert the int(hex) into a
# time.gmtime(), but it still would not cover all cases.
#
# If no folders were found... well... then there you go.
if len(folders) < 1:
return None
# Pop off the hex timestamp, which turned out to be valid
mbox['hex_timestamp'] = mbox['path_parts'].pop()
return mbox
def _find_deleted_folder(self, mbox):
"""
Give me the parts that are in an original mailfolder name and I'll
find the deleted folder name.
TODO: It finds virtdomain folders for non-virtdomain searches.
"""
deleted_folder_search = "%(deleted_prefix)s%(separator)s" + \
- "%(mailfolder)s%(separator)s*" % {
- # TODO: The prefix used is configurable
- 'deleted_prefix': "DELETED",
- 'mailfolder': self.separator.join(mbox['path_parts']),
- 'separator': self.separator,
- }
+ "%(mailfolder)s%(separator)s*" % {
+ # TODO: The prefix used is configurable
+ 'deleted_prefix': "DELETED",
+ 'mailfolder': self.separator.join(mbox['path_parts']),
+ 'separator': self.separator,
+ }
- if not mbox['domain'] == None:
+ if mbox['domain'] is not None:
deleted_folder_search = "%s@%s" % (
deleted_folder_search,
mbox['domain']
)
folders = self.lm(deleted_folder_search)
# The folders we have found at this stage include virtdomain folders.
#
# For example, having searched for user/userid, it will also find
# user/userid@example.org
#
# Here, we explicitly remove any virtdomain folders.
- if mbox['domain'] == None:
+ if mbox['domain'] is None:
_folders = []
for folder in folders:
if len(folder.split('@')) < 2:
_folders.append(folder)
folders = _folders
return folders

File Metadata

Mime Type
text/x-diff
Expires
Sat, Apr 4, 2:56 AM (4 d, 35 m ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
18822311
Default Alt Text
(20 KB)

Event Timeline