Changeset View
Changeset View
Standalone View
Standalone View
cyruslib.py
Show All 31 Lines | |||||
""" | """ | ||||
from sys import exit, stdout | from sys import exit, stdout | ||||
try: | try: | ||||
import imaplib | import imaplib | ||||
import re | import re | ||||
from binascii import b2a_base64 | from binascii import b2a_base64 | ||||
except ImportError, e: | except ImportError as e: | ||||
print(e) | print(e) | ||||
exit(1) | exit(1) | ||||
Commands = { | Commands = { | ||||
'RECONSTRUCT' : ('AUTH',), | 'RECONSTRUCT' : ('AUTH',), | ||||
Lint: PEP8 E203: whitespace before ':' | |||||
'DUMP' : ('AUTH',), # To check admin status | 'DUMP' : ('AUTH',), # To check admin status | ||||
Lint: PEP8 E203 whitespace before ':' Lint: PEP8 E203: whitespace before ':' | |||||
'ID' : ('AUTH',), # Only one ID allowed in non auth mode | 'ID' : ('AUTH',), # Only one ID allowed in non auth mode | ||||
Lint: PEP8 E203 whitespace before ':' Lint: PEP8 E203: whitespace before ':' | |||||
'GETANNOTATION': ('AUTH',), | 'GETANNOTATION': ('AUTH',), | ||||
'SETANNOTATION': ('AUTH',), | 'SETANNOTATION': ('AUTH',), | ||||
'XFER' : ('AUTH',) | 'XFER' : ('AUTH',) | ||||
Lint: PEP8 E203 whitespace before ':' Lint: PEP8 E203: whitespace before ':' | |||||
} | } | ||||
imaplib.Commands.update(Commands) | imaplib.Commands.update(Commands) | ||||
DEFAULT_SEP = '.' | DEFAULT_SEP = '.' | ||||
QUOTE = '"' | QUOTE = '"' | ||||
DQUOTE = '""' | DQUOTE = '""' | ||||
Show All 11 Lines | |||||
def unquote(text, qchar=QUOTE): | def unquote(text, qchar=QUOTE): | ||||
return ''.join(text.split(qchar)) | return ''.join(text.split(qchar)) | ||||
def getflags(test): | def getflags(test): | ||||
flags = [] | flags = [] | ||||
for flag in test.split('\\'): | for flag in test.split('\\'): | ||||
flag = flag.strip() | flag = flag.strip() | ||||
if len(flag): flags.append(flag) | if len(flag): flags.append(flag) | ||||
Lint: PEP8 E701 multiple statements on one line (colon) Lint: PEP8 E701: multiple statements on one line (colon) | |||||
return flags | return flags | ||||
### A smart function to return an array of split strings | ### A smart function to return an array of split strings | ||||
### and honours quoted strings | ### and honours quoted strings | ||||
def splitquote(text): | def splitquote(text): | ||||
data = text.split(QUOTE) | data = text.split(QUOTE) | ||||
if len(data) == 1: # no quotes | if len(data) == 1: # no quotes | ||||
res = data[0].split() | res = data[0].split() | ||||
else: | else: | ||||
res = [] | res = [] | ||||
for match in data: | for match in data: | ||||
if len(match.strip()) == 0: continue | if len(match.strip()) == 0: continue | ||||
Lint: PEP8 E701 multiple statements on one line (colon) Lint: PEP8 E701: multiple statements on one line (colon) | |||||
if match[0] == ' ': | if match[0] == ' ': | ||||
res = res + match.strip().split() | res = res + match.strip().split() | ||||
else: | else: | ||||
res.append(match) | res.append(match) | ||||
return res | return res | ||||
### return a dictionary from a cyrus info response | ### return a dictionary from a cyrus info response | ||||
def res2dict(data): | def res2dict(data): | ||||
data = splitquote(data) | data = splitquote(data) | ||||
datalen = len(data) | datalen = len(data) | ||||
if datalen % 2: # Unmatched pair | if datalen % 2: # Unmatched pair | ||||
return False, {} | return False, {} | ||||
res = {} | res = {} | ||||
for i in range(0, datalen, 2): | for i in range(0, datalen, 2): | ||||
res[data[i]] = data[i+1] | res[data[i]] = data[i+1] | ||||
return True, res | return True, res | ||||
class CYRUSError(Exception): pass | class CYRUSError(Exception): pass | ||||
Lint: PEP8 E701 multiple statements on one line (colon) Lint: PEP8 E701: multiple statements on one line (colon) | |||||
class IMAP4(imaplib.IMAP4): | class IMAP4(imaplib.IMAP4): | ||||
def getsep(self): | def getsep(self): | ||||
"""Get mailbox separator""" | """Get mailbox separator""" | ||||
### yes, ugly but cyradm does it in the same way | ### yes, ugly but cyradm does it in the same way | ||||
### also more realable then calling NAMESPACE | ### also more realable then calling NAMESPACE | ||||
### and it should be also compatibile with other servers | ### and it should be also compatibile with other servers | ||||
try: | try: | ||||
▲ Show 20 Lines • Show All 277 Lines • ▼ Show 20 Lines | class CYRUS: | ||||
def xfer(self, mailbox, server): | def xfer(self, mailbox, server): | ||||
"""Xfer a mailbox to server""" | """Xfer a mailbox to server""" | ||||
return self.m._simple_command('XFER', mailbox, server) | return self.m._simple_command('XFER', mailbox, server) | ||||
def id(self): | def id(self): | ||||
self.__prepare('id') | self.__prepare('id') | ||||
res, data = self.m.id() | res, data = self.m.id() | ||||
data = data.strip() | data = data.strip() | ||||
if not res or (len(data) < 3): return False, {} | if not res or (len(data) < 3): return False, {} | ||||
Lint: PEP8 E701 multiple statements on one line (colon) Lint: PEP8 E701: multiple statements on one line (colon) | |||||
data = data[1:-1] # Strip () | data = data[1:-1] # Strip () | ||||
res, rdata = res2dict(data) | res, rdata = res2dict(data) | ||||
if not res: | if not res: | ||||
self.__verbose( '[ID] Umatched pairs in result' ) | self.__verbose( '[ID] Umatched pairs in result' ) | ||||
return res, rdata | return res, rdata | ||||
def login(self, username, password): | def login(self, username, password): | ||||
if self.AUTH: | if self.AUTH: | ||||
▲ Show 20 Lines • Show All 94 Lines • ▼ Show 20 Lines | def lm(self, pattern="*"): | ||||
To list all users mailboxes lm("user/*") | To list all users mailboxes lm("user/*") | ||||
To list users mailboxes startwith a word lm("user/word*") | To list users mailboxes startwith a word lm("user/word*") | ||||
To list global top folders lm("%") | To list global top folders lm("%") | ||||
To list global startwith a word unsupported by server | To list global startwith a word unsupported by server | ||||
suggestion lm("word*") | suggestion lm("word*") | ||||
""" | """ | ||||
self.__prepare('LIST') | self.__prepare('LIST') | ||||
if pattern == '': pattern = "*" | if pattern == '': pattern = "*" | ||||
Lint: PEP8 E701 multiple statements on one line (colon) Lint: PEP8 E701: multiple statements on one line (colon) | |||||
if pattern == '%': | if pattern == '%': | ||||
res, ml = self.__docommand('list', '', '%') | res, ml = self.__docommand('list', '', '%') | ||||
else: | else: | ||||
res, ml = self.__docommand('list', '""', self.decode(pattern)) | res, ml = self.__docommand('list', '""', self.decode(pattern)) | ||||
if not ok(res): | if not ok(res): | ||||
self.__verbose( '[LIST] %s: %s' % (res, ml) ) | self.__verbose( '[LIST] %s: %s' % (res, ml) ) | ||||
return [] | return [] | ||||
if (len(ml) == 1) and ml[0] is None: | if (len(ml) == 1) and ml[0] is None: | ||||
self.__verbose( '[LIST] No results' ) | self.__verbose( '[LIST] No results' ) | ||||
return [] | return [] | ||||
mb = [] | mb = [] | ||||
for mailbox in ml: | for mailbox in ml: | ||||
res = re_mb.match(mailbox) | res = re_mb.match(mailbox) | ||||
if res is None: continue | if res is None: continue | ||||
Lint: PEP8 E701 multiple statements on one line (colon) Lint: PEP8 E701: multiple statements on one line (colon) | |||||
mbe = unquote(res.group(2)) | mbe = unquote(res.group(2)) | ||||
if 'Noselect' in getflags(res.group(1)): continue | if 'Noselect' in getflags(res.group(1)): continue | ||||
Lint: PEP8 E701 multiple statements on one line (colon) Lint: PEP8 E701: multiple statements on one line (colon) | |||||
mb.append(self.encode(mbe)) | mb.append(self.encode(mbe)) | ||||
return mb | return mb | ||||
def cm(self, mailbox, partition=None): | def cm(self, mailbox, partition=None): | ||||
"""Create mailbox""" | """Create mailbox""" | ||||
self.__prepare('CREATE', mailbox) | self.__prepare('CREATE', mailbox) | ||||
res, msg = self.__docommand('create', self.decode(mailbox), partition) | res, msg = self.__docommand('create', self.decode(mailbox), partition) | ||||
self.__verbose( '[CREATE %s partition=%s] %s: %s' % (mailbox, partition, res, msg[0]) ) | self.__verbose( '[CREATE %s partition=%s] %s: %s' % (mailbox, partition, res, msg[0]) ) | ||||
▲ Show 20 Lines • Show All 252 Lines • ▼ Show 20 Lines | def reconstruct(self, mailbox, recursive=True): | ||||
for mbox in mbxList: | for mbox in mbxList: | ||||
self.__reconstruct(mbox) | self.__reconstruct(mbox) | ||||
self.__reconstruct(mailbox) | self.__reconstruct(mailbox) | ||||
def lsub(self, pattern="*"): | def lsub(self, pattern="*"): | ||||
if self.AUSER is None: | if self.AUSER is None: | ||||
self.__doexception("lsub") | self.__doexception("lsub") | ||||
self.__prepare('LSUB') | self.__prepare('LSUB') | ||||
if pattern == '': pattern = "*" | if pattern == '': pattern = "*" | ||||
Lint: PEP8 E701 multiple statements on one line (colon) Lint: PEP8 E701: multiple statements on one line (colon) | |||||
res, ml = self.__docommand('lsub', '*', pattern) | res, ml = self.__docommand('lsub', '*', pattern) | ||||
if not ok(res): | if not ok(res): | ||||
self.__verbose( '[LIST] %s: %s' % (res, ml) ) | self.__verbose( '[LIST] %s: %s' % (res, ml) ) | ||||
return [] | return [] | ||||
if (len(ml) == 1) and ml[0] is None: | if (len(ml) == 1) and ml[0] is None: | ||||
self.__verbose( '[LIST] No results' ) | self.__verbose( '[LIST] No results' ) | ||||
return [] | return [] | ||||
mb = [] | mb = [] | ||||
for mailbox in ml: | for mailbox in ml: | ||||
res = re_mb.match(mailbox) | res = re_mb.match(mailbox) | ||||
if res is None: continue | if res is None: continue | ||||
Lint: PEP8 E701 multiple statements on one line (colon) Lint: PEP8 E701: multiple statements on one line (colon) | |||||
mbe = unquote(res.group(2)) | mbe = unquote(res.group(2)) | ||||
if 'Noselect' in getflags(res.group(1)): continue | if 'Noselect' in getflags(res.group(1)): continue | ||||
Lint: PEP8 E701 multiple statements on one line (colon) Lint: PEP8 E701: multiple statements on one line (colon) | |||||
mb.append(self.encode(mbe)) | mb.append(self.encode(mbe)) | ||||
return mb | return mb | ||||
def subscribe(self, mailbox): | def subscribe(self, mailbox): | ||||
"""Subscribe""" | """Subscribe""" | ||||
self.__prepare('SUBSCRIBE') | self.__prepare('SUBSCRIBE') | ||||
res, msg = self.__docommand("subscribe", self.decode(mailbox)) | res, msg = self.__docommand("subscribe", self.decode(mailbox)) | ||||
self.__verbose( '[SUBSCRIBE %s] %s: %s' % (mailbox, res, msg[0]) ) | self.__verbose( '[SUBSCRIBE %s] %s: %s' % (mailbox, res, msg[0]) ) | ||||
def unsubscribe(self, mailbox): | def unsubscribe(self, mailbox): | ||||
"""Unsubscribe""" | """Unsubscribe""" | ||||
self.__prepare('UNSUBSCRIBE') | self.__prepare('UNSUBSCRIBE') | ||||
res, msg = self.__docommand("unsubscribe", self.decode(mailbox)) | res, msg = self.__docommand("unsubscribe", self.decode(mailbox)) | ||||
self.__verbose( '[UNSUBSCRIBE %s] %s: %s' % (mailbox, res, msg[0]) ) | self.__verbose( '[UNSUBSCRIBE %s] %s: %s' % (mailbox, res, msg[0]) ) | ||||
whitespace before ':'