Changeset View
Changeset View
Standalone View
Standalone View
cyruslib.py
Show All 34 Lines | try: | ||||
import imaplib | import imaplib | ||||
import re | import re | ||||
from binascii import b2a_base64 | from binascii import b2a_base64 | ||||
except ImportError, e: | except ImportError, e: | ||||
print e | print e | ||||
exit(1) | exit(1) | ||||
Commands = { | Commands = { | ||||
'RECONSTRUCT' : ('AUTH',), | 'RECONSTRUCT' : ('AUTH',), | ||||
Lint: PEP8 E203: whitespace before ':' | |||||
'DUMP' : ('AUTH',), # To check admin status | 'DUMP' : ('AUTH',), # To check admin status | ||||
Lint: PEP8 E203 whitespace before ':' Lint: PEP8 E203: whitespace before ':' | |||||
'ID' : ('AUTH',), # Only one ID allowed in non auth mode | 'ID' : ('AUTH',), # Only one ID allowed in non auth mode | ||||
Lint: PEP8 E203 whitespace before ':' Lint: PEP8 E203: whitespace before ':' | |||||
'GETANNOTATION': ('AUTH',), | 'GETANNOTATION': ('AUTH',), | ||||
'SETANNOTATION': ('AUTH',), | 'SETANNOTATION': ('AUTH',), | ||||
'XFER' : ('AUTH',) | 'XFER' : ('AUTH',) | ||||
Lint: PEP8 E203 whitespace before ':' Lint: PEP8 E203: whitespace before ':' | |||||
} | } | ||||
imaplib.Commands.update(Commands) | imaplib.Commands.update(Commands) | ||||
DEFAULT_SEP = '.' | DEFAULT_SEP = '.' | ||||
QUOTE = '"' | QUOTE = '"' | ||||
DQUOTE = '""' | DQUOTE = '""' | ||||
Show All 11 Lines | |||||
def unquote(text, qchar=QUOTE): | def unquote(text, qchar=QUOTE): | ||||
return ''.join(text.split(qchar)) | return ''.join(text.split(qchar)) | ||||
def getflags(test): | def getflags(test): | ||||
flags = [] | flags = [] | ||||
for flag in test.split('\\'): | for flag in test.split('\\'): | ||||
flag = flag.strip() | flag = flag.strip() | ||||
if len(flag): flags.append(flag) | if len(flag): flags.append(flag) | ||||
Lint: PEP8 E701 multiple statements on one line (colon) Lint: PEP8 E701: multiple statements on one line (colon) | |||||
return flags | return flags | ||||
### A smart function to return an array of split strings | ### A smart function to return an array of split strings | ||||
### and honours quoted strings | ### and honours quoted strings | ||||
def splitquote(text): | def splitquote(text): | ||||
data = text.split(QUOTE) | data = text.split(QUOTE) | ||||
if len(data) == 1: # no quotes | if len(data) == 1: # no quotes | ||||
res = data[0].split() | res = data[0].split() | ||||
else: | else: | ||||
res = [] | res = [] | ||||
for match in data: | for match in data: | ||||
if len(match.strip()) == 0: continue | if len(match.strip()) == 0: continue | ||||
Lint: PEP8 E701 multiple statements on one line (colon) Lint: PEP8 E701: multiple statements on one line (colon) | |||||
if match[0] == ' ': | if match[0] == ' ': | ||||
res = res + match.strip().split() | res = res + match.strip().split() | ||||
else: | else: | ||||
res.append(match) | res.append(match) | ||||
return res | return res | ||||
### return a dictionary from a cyrus info response | ### return a dictionary from a cyrus info response | ||||
def res2dict(data): | def res2dict(data): | ||||
data = splitquote(data) | data = splitquote(data) | ||||
datalen = len(data) | datalen = len(data) | ||||
if datalen % 2: # Unmatched pair | if datalen % 2: # Unmatched pair | ||||
return False, {} | return False, {} | ||||
res = {} | res = {} | ||||
for i in range(0, datalen, 2): | for i in range(0, datalen, 2): | ||||
res[data[i]] = data[i+1] | res[data[i]] = data[i+1] | ||||
return True, res | return True, res | ||||
class CYRUSError(Exception): pass | class CYRUSError(Exception): pass | ||||
Lint: PEP8 E701 multiple statements on one line (colon) Lint: PEP8 E701: multiple statements on one line (colon) | |||||
class IMAP4(imaplib.IMAP4): | class IMAP4(imaplib.IMAP4): | ||||
def getsep(self): | def getsep(self): | ||||
"""Get mailbox separator""" | """Get mailbox separator""" | ||||
### yes, ugly but cyradm does it in the same way | ### yes, ugly but cyradm does it in the same way | ||||
### also more realable then calling NAMESPACE | ### also more realable then calling NAMESPACE | ||||
### and it should be also compatibile with other servers | ### and it should be also compatibile with other servers | ||||
try: | try: | ||||
▲ Show 20 Lines • Show All 261 Lines • ▼ Show 20 Lines | class CYRUS: | ||||
def __docommand(self, function, *args): | def __docommand(self, function, *args): | ||||
wrapped = getattr(self.m, function, None) | wrapped = getattr(self.m, function, None) | ||||
if wrapped is None: | if wrapped is None: | ||||
raise self.__doraise("UNKCMD") | raise self.__doraise("UNKCMD") | ||||
try: | try: | ||||
res, msg = wrapped(*args) | res, msg = wrapped(*args) | ||||
if ok(res): | if ok(res): | ||||
return res, msg | return res, msg | ||||
except Exception, info: | except Exception as info: | ||||
error = str(info).split(':').pop().strip() | error = str(info).split(':').pop().strip() | ||||
if error.upper().startswith('BAD'): | if error.upper().startswith('BAD'): | ||||
error = error.split('BAD', 1).pop().strip() | error = error.split('BAD', 1).pop().strip() | ||||
error = unquote(error[1:-1], '\'') | error = unquote(error[1:-1], '\'') | ||||
self.__doexception(function, error, *args) | self.__doexception(function, error, *args) | ||||
self.__doexception(function, msg[0], *args) | self.__doexception(function, msg[0], *args) | ||||
def xfer(self, mailbox, server): | def xfer(self, mailbox, server): | ||||
"""Xfer a mailbox to server""" | """Xfer a mailbox to server""" | ||||
return self.m._simple_command('XFER', mailbox, server) | return self.m._simple_command('XFER', mailbox, server) | ||||
def id(self): | def id(self): | ||||
self.__prepare('id') | self.__prepare('id') | ||||
res, data = self.m.id() | res, data = self.m.id() | ||||
data = data.strip() | data = data.strip() | ||||
if not res or (len(data) < 3): return False, {} | if not res or (len(data) < 3): return False, {} | ||||
Lint: PEP8 E701 multiple statements on one line (colon) Lint: PEP8 E701: multiple statements on one line (colon) | |||||
data = data[1:-1] # Strip () | data = data[1:-1] # Strip () | ||||
res, rdata = res2dict(data) | res, rdata = res2dict(data) | ||||
if not res: | if not res: | ||||
self.__verbose( '[ID] Umatched pairs in result' ) | self.__verbose( '[ID] Umatched pairs in result' ) | ||||
return res, rdata | return res, rdata | ||||
def login(self, username, password): | def login(self, username, password): | ||||
if self.AUTH: | if self.AUTH: | ||||
self.__doexception("LOGIN", self.ERROR.get("AUTH")[1]) | self.__doexception("LOGIN", self.ERROR.get("AUTH")[1]) | ||||
try: | try: | ||||
res, msg = self.m.login(username, password) | res, msg = self.m.login(username, password) | ||||
self.AUTH = True | self.AUTH = True | ||||
self.id() | self.id() | ||||
admin = self.m.isadmin() | admin = self.m.isadmin() | ||||
except Exception, info: | except Exception as info: | ||||
self.AUTH = False | self.AUTH = False | ||||
error = str(info).split(':').pop().strip() | error = str(info).split(':').pop().strip() | ||||
self.__doexception("LOGIN", error) | self.__doexception("LOGIN", error) | ||||
if admin: | if admin: | ||||
self.ADMIN = username | self.ADMIN = username | ||||
self.SEP = self.m.getsep() | self.SEP = self.m.getsep() | ||||
Show All 16 Lines | def login_plain(self, username, password, asUser = None): | ||||
else: | else: | ||||
self.ADMIN = asUser | self.ADMIN = asUser | ||||
self.AUSER = asUser | self.AUSER = asUser | ||||
self.SEP = self.m.getsep() | self.SEP = self.m.getsep() | ||||
def logout(self): | def logout(self): | ||||
try: | try: | ||||
res, msg = self.m.logout() | res, msg = self.m.logout() | ||||
except Exception, info: | except Exception as info: | ||||
error = str(info).split(':').pop().strip() | error = str(info).split(':').pop().strip() | ||||
self.__doexception("LOGOUT", error) | self.__doexception("LOGOUT", error) | ||||
self.AUTH = False | self.AUTH = False | ||||
self.ADMIN = None | self.ADMIN = None | ||||
self.AUSER = None | self.AUSER = None | ||||
self.__verbose( '[LOGOUT] %s: %s' % (res, msg[0]) ) | self.__verbose( '[LOGOUT] %s: %s' % (res, msg[0]) ) | ||||
def getEncoding(self): | def getEncoding(self): | ||||
▲ Show 20 Lines • Show All 46 Lines • ▼ Show 20 Lines | def lm(self, pattern="*"): | ||||
To list all users mailboxes lm("user/*") | To list all users mailboxes lm("user/*") | ||||
To list users mailboxes startwith a word lm("user/word*") | To list users mailboxes startwith a word lm("user/word*") | ||||
To list global top folders lm("%") | To list global top folders lm("%") | ||||
To list global startwith a word unsupported by server | To list global startwith a word unsupported by server | ||||
suggestion lm("word*") | suggestion lm("word*") | ||||
""" | """ | ||||
self.__prepare('LIST') | self.__prepare('LIST') | ||||
if pattern == '': pattern = "*" | if pattern == '': pattern = "*" | ||||
Lint: PEP8 E701 multiple statements on one line (colon) Lint: PEP8 E701: multiple statements on one line (colon) | |||||
if pattern == '%': | if pattern == '%': | ||||
res, ml = self.__docommand('list', '', '%') | res, ml = self.__docommand('list', '', '%') | ||||
else: | else: | ||||
res, ml = self.__docommand('list', '""', self.decode(pattern)) | res, ml = self.__docommand('list', '""', self.decode(pattern)) | ||||
if not ok(res): | if not ok(res): | ||||
self.__verbose( '[LIST] %s: %s' % (res, ml) ) | self.__verbose( '[LIST] %s: %s' % (res, ml) ) | ||||
return [] | return [] | ||||
if (len(ml) == 1) and ml[0] is None: | if (len(ml) == 1) and ml[0] is None: | ||||
self.__verbose( '[LIST] No results' ) | self.__verbose( '[LIST] No results' ) | ||||
return [] | return [] | ||||
mb = [] | mb = [] | ||||
for mailbox in ml: | for mailbox in ml: | ||||
res = re_mb.match(mailbox) | res = re_mb.match(mailbox) | ||||
if res is None: continue | if res is None: continue | ||||
Lint: PEP8 E701 multiple statements on one line (colon) Lint: PEP8 E701: multiple statements on one line (colon) | |||||
mbe = unquote(res.group(2)) | mbe = unquote(res.group(2)) | ||||
if 'Noselect' in getflags(res.group(1)): continue | if 'Noselect' in getflags(res.group(1)): continue | ||||
Lint: PEP8 E701 multiple statements on one line (colon) Lint: PEP8 E701: multiple statements on one line (colon) | |||||
mb.append(self.encode(mbe)) | mb.append(self.encode(mbe)) | ||||
return mb | return mb | ||||
def cm(self, mailbox, partition=None): | def cm(self, mailbox, partition=None): | ||||
"""Create mailbox""" | """Create mailbox""" | ||||
self.__prepare('CREATE', mailbox) | self.__prepare('CREATE', mailbox) | ||||
res, msg = self.__docommand('create', self.decode(mailbox), partition) | res, msg = self.__docommand('create', self.decode(mailbox), partition) | ||||
self.__verbose( '[CREATE %s partition=%s] %s: %s' % (mailbox, partition, res, msg[0]) ) | self.__verbose( '[CREATE %s partition=%s] %s: %s' % (mailbox, partition, res, msg[0]) ) | ||||
Show All 30 Lines | def lam(self, mailbox): | ||||
res, acl = self.__docommand("getacl", self.decode(mailbox)) | res, acl = self.__docommand("getacl", self.decode(mailbox)) | ||||
acls = {} | acls = {} | ||||
aclList = splitquote(acl.pop().strip()) | aclList = splitquote(acl.pop().strip()) | ||||
del aclList[0] # mailbox | del aclList[0] # mailbox | ||||
for i in range(0, len(aclList), 2): | for i in range(0, len(aclList), 2): | ||||
try: | try: | ||||
userid = self.encode(aclList[i]) | userid = self.encode(aclList[i]) | ||||
rights = aclList[i + 1] | rights = aclList[i + 1] | ||||
except Exception, info: | except Exception as info: | ||||
self.__verbose( '[GETACL %s] BAD: %s' % (mailbox, info.args[0]) ) | self.__verbose( '[GETACL %s] BAD: %s' % (mailbox, info.args[0]) ) | ||||
raise self.__doraise("GETACL") | raise self.__doraise("GETACL") | ||||
self.__verbose( '[GETACL %s] %s %s' % (mailbox, userid, rights) ) | self.__verbose( '[GETACL %s] %s %s' % (mailbox, userid, rights) ) | ||||
acls[userid] = rights | acls[userid] = rights | ||||
return acls | return acls | ||||
def sam(self, mailbox, userid, rights): | def sam(self, mailbox, userid, rights): | ||||
"""Set ACL""" | """Set ACL""" | ||||
▲ Show 20 Lines • Show All 43 Lines • ▼ Show 20 Lines | def lqr(self, mailbox): | ||||
self.__verbose( '[GETQUOTAROOT %s] BAD: Error while parsing results' % mailbox ) | self.__verbose( '[GETQUOTAROOT %s] BAD: Error while parsing results' % mailbox ) | ||||
return _root, None, None | return _root, None, None | ||||
def sq(self, mailbox, limit): | def sq(self, mailbox, limit): | ||||
"""Set Quota""" | """Set Quota""" | ||||
self.__prepare('SETQUOTA', mailbox) | self.__prepare('SETQUOTA', mailbox) | ||||
try: | try: | ||||
limit = int(limit) | limit = int(limit) | ||||
except ValueError, e: | except ValueError: | ||||
self.__verbose( '[SETQUOTA %s] BAD: %s %s' % (mailbox, self.ERROR.get("SETQUOTA")[1], limit) ) | self.__verbose( '[SETQUOTA %s] BAD: %s %s' % (mailbox, self.ERROR.get("SETQUOTA")[1], limit) ) | ||||
raise self.__doraise("SETQUOTA") | raise self.__doraise("SETQUOTA") | ||||
res, msg = self.__docommand("setquota", self.decode(mailbox), limit) | res, msg = self.__docommand("setquota", self.decode(mailbox), limit) | ||||
self.__verbose( '[SETQUOTA %s %s] %s: %s' % (mailbox, limit, res, msg[0]) ) | self.__verbose( '[SETQUOTA %s %s] %s: %s' % (mailbox, limit, res, msg[0]) ) | ||||
def getannotation(self, mailbox, pattern='*'): | def getannotation(self, mailbox, pattern='*'): | ||||
"""Get Annotation""" | """Get Annotation""" | ||||
self.__prepare('GETANNOTATION') | self.__prepare('GETANNOTATION') | ||||
Show All 40 Lines | def getannotation(self, mailbox, pattern='*'): | ||||
key = annotation.split('"')[3].replace('"','').replace("'","").strip() | key = annotation.split('"')[3].replace('"','').replace("'","").strip() | ||||
_annot = annotation.split('(')[1].split(')')[0].strip() | _annot = annotation.split('(')[1].split(')')[0].strip() | ||||
if not ann.has_key(folder): | if not ann.has_key(folder): | ||||
ann[folder] = {} | ann[folder] = {} | ||||
try: | try: | ||||
value_priv = _annot[(_annot.index('"value.priv"')+len('"value.priv"')):_annot.index('"size.priv"')].strip() | value_priv = _annot[(_annot.index('"value.priv"')+len('"value.priv"')):_annot.index('"size.priv"')].strip() | ||||
except ValueError, errmsg: | except ValueError: | ||||
value_priv = None | value_priv = None | ||||
try: | try: | ||||
size_priv = _annot[(_annot.index('"size.priv"')+len('"size.priv"')):].strip().split('"')[1].strip() | size_priv = _annot[(_annot.index('"size.priv"')+len('"size.priv"')):].strip().split('"')[1].strip() | ||||
try: | try: | ||||
value_priv = value_priv[value_priv.index('{%s}' % (size_priv))+len('{%s}' % (size_priv)):].strip() | value_priv = value_priv[value_priv.index('{%s}' % (size_priv))+len('{%s}' % (size_priv)):].strip() | ||||
except Exception, errmsg: | except Exception: | ||||
pass | pass | ||||
except Exception, errmsg: | except Exception: | ||||
pass | pass | ||||
if value_priv in empty_values: | if value_priv in empty_values: | ||||
value_priv = None | value_priv = None | ||||
else: | else: | ||||
try: | try: | ||||
value_priv = value_priv[:value_priv.index('"content-type.priv"')].strip() | value_priv = value_priv[:value_priv.index('"content-type.priv"')].strip() | ||||
except: | except: | ||||
Show All 10 Lines | def getannotation(self, mailbox, pattern='*'): | ||||
if value_priv.endswith('"'): | if value_priv.endswith('"'): | ||||
value_priv = value_priv[:-1] | value_priv = value_priv[:-1] | ||||
if value_priv in empty_values: | if value_priv in empty_values: | ||||
value_priv = None | value_priv = None | ||||
try: | try: | ||||
value_shared = _annot[(_annot.index('"value.shared"')+len('"value.shared"')):_annot.index('"size.shared"')].strip() | value_shared = _annot[(_annot.index('"value.shared"')+len('"value.shared"')):_annot.index('"size.shared"')].strip() | ||||
except ValueError, errmsg: | except ValueError: | ||||
value_shared = None | value_shared = None | ||||
try: | try: | ||||
size_shared = _annot[(_annot.index('"size.shared"')+len('"size.shared"')):].strip().split('"')[1].strip() | size_shared = _annot[(_annot.index('"size.shared"')+len('"size.shared"')):].strip().split('"')[1].strip() | ||||
try: | try: | ||||
value_shared = value_shared[value_shared.index('{%s}' % (size_shared))+len('{%s}' % (size_shared)):].strip() | value_shared = value_shared[value_shared.index('{%s}' % (size_shared))+len('{%s}' % (size_shared)):].strip() | ||||
except Exception, errmsg: | except Exception: | ||||
pass | pass | ||||
except Exception, errmsg: | except Exception: | ||||
pass | pass | ||||
if value_shared in empty_values: | if value_shared in empty_values: | ||||
value_shared = None | value_shared = None | ||||
else: | else: | ||||
try: | try: | ||||
value_shared = value_shared[:value_shared.index('"content-type.shared"')].strip() | value_shared = value_shared[:value_shared.index('"content-type.shared"')].strip() | ||||
except: | except: | ||||
▲ Show 20 Lines • Show All 43 Lines • ▼ Show 20 Lines | def reconstruct(self, mailbox, recursive=True): | ||||
for mbox in mbxList: | for mbox in mbxList: | ||||
self.__reconstruct(mbox) | self.__reconstruct(mbox) | ||||
self.__reconstruct(mailbox) | self.__reconstruct(mailbox) | ||||
def lsub(self, pattern="*"): | def lsub(self, pattern="*"): | ||||
if self.AUSER is None: | if self.AUSER is None: | ||||
self.__doexception("lsub") | self.__doexception("lsub") | ||||
self.__prepare('LSUB') | self.__prepare('LSUB') | ||||
if pattern == '': pattern = "*" | if pattern == '': pattern = "*" | ||||
Lint: PEP8 E701 multiple statements on one line (colon) Lint: PEP8 E701: multiple statements on one line (colon) | |||||
res, ml = self.__docommand('lsub', '*', pattern) | res, ml = self.__docommand('lsub', '*', pattern) | ||||
if not ok(res): | if not ok(res): | ||||
self.__verbose( '[LIST] %s: %s' % (res, ml) ) | self.__verbose( '[LIST] %s: %s' % (res, ml) ) | ||||
return [] | return [] | ||||
if (len(ml) == 1) and ml[0] is None: | if (len(ml) == 1) and ml[0] is None: | ||||
self.__verbose( '[LIST] No results' ) | self.__verbose( '[LIST] No results' ) | ||||
return [] | return [] | ||||
mb = [] | mb = [] | ||||
for mailbox in ml: | for mailbox in ml: | ||||
res = re_mb.match(mailbox) | res = re_mb.match(mailbox) | ||||
if res is None: continue | if res is None: continue | ||||
Lint: PEP8 E701 multiple statements on one line (colon) Lint: PEP8 E701: multiple statements on one line (colon) | |||||
mbe = unquote(res.group(2)) | mbe = unquote(res.group(2)) | ||||
if 'Noselect' in getflags(res.group(1)): continue | if 'Noselect' in getflags(res.group(1)): continue | ||||
Lint: PEP8 E701 multiple statements on one line (colon) Lint: PEP8 E701: multiple statements on one line (colon) | |||||
mb.append(self.encode(mbe)) | mb.append(self.encode(mbe)) | ||||
return mb | return mb | ||||
def subscribe(self, mailbox): | def subscribe(self, mailbox): | ||||
"""Subscribe""" | """Subscribe""" | ||||
self.__prepare('SUBSCRIBE') | self.__prepare('SUBSCRIBE') | ||||
res, msg = self.__docommand("subscribe", self.decode(mailbox)) | res, msg = self.__docommand("subscribe", self.decode(mailbox)) | ||||
self.__verbose( '[SUBSCRIBE %s] %s: %s' % (mailbox, res, msg[0]) ) | self.__verbose( '[SUBSCRIBE %s] %s: %s' % (mailbox, res, msg[0]) ) | ||||
def unsubscribe(self, mailbox): | def unsubscribe(self, mailbox): | ||||
"""Unsubscribe""" | """Unsubscribe""" | ||||
self.__prepare('UNSUBSCRIBE') | self.__prepare('UNSUBSCRIBE') | ||||
res, msg = self.__docommand("unsubscribe", self.decode(mailbox)) | res, msg = self.__docommand("unsubscribe", self.decode(mailbox)) | ||||
self.__verbose( '[UNSUBSCRIBE %s] %s: %s' % (mailbox, res, msg[0]) ) | self.__verbose( '[UNSUBSCRIBE %s] %s: %s' % (mailbox, res, msg[0]) ) | ||||
whitespace before ':'