Page MenuHomePhorge

cyruslib.py
No OneTemporary

Authored By
Unknown
Size
26 KB
Referenced Files
None
Subscribers
None

cyruslib.py

# -*- coding: utf-8 -*-
#
# Cyruslib v0.8.5-20090401
# Copyright (C) 2007-2009 Reinaldo de Carvalho <reinaldoc@gmail.com>
# Copyright (C) 2003-2006 Gianluigi Tiesi <sherpya@netfarm.it>
# Copyright (C) 2003-2006 NetFarm S.r.l. [http://www.netfarm.it]
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
#
# Requires python >= 2.3
#
__version__ = '0.8.5'
__all__ = [ 'CYRUS' ]
__doc__ = """Cyrus admin wrapper
Adds cyrus-specific commands to imaplib IMAP4 Class
and defines new CYRUS class for cyrus imapd commands
"""
from sys import exit, stdout
try:
import imaplib
import re
from binascii import b2a_base64
except ImportError, e:
print e
exit(1)
Commands = {
'RECONSTRUCT' : ('AUTH',),
'DUMP' : ('AUTH',), # To check admin status
'ID' : ('AUTH',), # Only one ID allowed in non auth mode
'GETANNOTATION': ('AUTH',),
'SETANNOTATION': ('AUTH',),
'XFER' : ('AUTH',)
}
imaplib.Commands.update(Commands)
DEFAULT_SEP = '.'
QUOTE = '"'
DQUOTE = '""'
re_ns = re.compile(r'.*\(\(\".*(\.|/)\"\)\).*')
re_q0 = re.compile(r'(.*)\s\(\)')
re_q = re.compile(r'(.*)\s\(STORAGE (\d+) (\d+)\)')
re_mb = re.compile(r'\((.*)\)\s\".\"\s(.*)')
re_url = re.compile(r'^(imaps?)://(.+?):?(\d{0,5})$')
def ok(res):
return res.upper().startswith('OK')
def quote(text, qchar=QUOTE):
return text.join([qchar, qchar])
def unquote(text, qchar=QUOTE):
return ''.join(text.split(qchar))
def getflags(test):
flags = []
for flag in test.split('\\'):
flag = flag.strip()
if len(flag): flags.append(flag)
return flags
### A smart function to return an array of splitted strings
### and honours quoted strings
def splitquote(text):
data = text.split(QUOTE)
if len(data) == 1: # no quotes
res = data[0].split()
else:
res = []
for match in data:
if len(match.strip()) == 0: continue
if match[0] == ' ':
res = res + match.strip().split()
else:
res.append(match)
return res
### return a dictionary from a cyrus info response
def res2dict(data):
data = splitquote(data)
datalen = len(data)
if datalen % 2: # Unmatched pair
return False, {}
res = {}
for i in range(0, datalen, 2):
res[data[i]] = data[i+1]
return True, res
class CYRUSError(Exception): pass
class IMAP4(imaplib.IMAP4):
def getsep(self):
"""Get mailbox separator"""
### yes, ugly but cyradm does it in the same way
### also more realable then calling NAMESPACE
### and it should be also compatibile with other servers
try:
return unquote(self.list(DQUOTE, DQUOTE)[1][0]).split()[1]
except:
return DEFAULT_SEP
def isadmin(self):
### A trick to check if the user is admin or not
### normal users cannot use dump command
try:
res, msg = self._simple_command('DUMP', 'NIL')
if msg[0].lower().find('denied') == -1:
return True
except:
pass
return False
def id(self):
try:
typ, dat = self._simple_command('ID', 'NIL')
res, dat = self._untagged_response(typ, dat, 'ID')
except:
return False, dat[0]
return ok(res), dat[0]
def getannotation(self, mailbox, pattern='*', shared=None):
if shared == None:
typ, dat = self._simple_command('GETANNOTATION', mailbox, quote(pattern), quote('*'))
elif shared:
typ, dat = self._simple_command('GETANNOTATION', mailbox, quote(pattern), quote('value.shared'))
else:
typ, dat = self._simple_command('GETANNOTATION', mailbox, quote(pattern), quote('value.priv'))
return self._untagged_response(typ, dat, 'ANNOTATION')
def setannotation(self, mailbox, desc, value, shared=False):
if value:
value = quote(value)
else:
value = "NIL"
if shared:
typ, dat = self._simple_command('SETANNOTATION', mailbox, quote(desc), "(%s %s)" % (quote('value.shared'), value) )
else:
typ, dat = self._simple_command('SETANNOTATION', mailbox, quote(desc), "(%s %s)" % (quote('value.priv'), value) )
return self._untagged_response(typ, dat, 'ANNOTATION')
def setquota(self, mailbox, limit):
"""Set quota of a mailbox"""
if limit == 0:
quota = '()'
else:
quota = '(STORAGE %s)' % limit
return self._simple_command('SETQUOTA', mailbox, quota)
### Overridden to support partition
### Pychecker will complain about non matching signature
def create(self, mailbox, partition=None):
"""Create a mailbox, partition is optional"""
if partition is not None:
return self._simple_command('CREATE', mailbox, partition)
else:
return self._simple_command('CREATE', mailbox)
### Overridden to support partition
### Pychecker: same here
def rename(self, from_mailbox, to_mailbox, partition=None):
"""Rename a from_mailbox to to_mailbox, partition is optional"""
if partition is not None:
return self._simple_command('RENAME', from_mailbox, to_mailbox, partition)
else:
return self._simple_command('RENAME', from_mailbox, to_mailbox)
def reconstruct(self, mailbox):
return self._simple_command('RECONSTRUCT', mailbox)
class IMAP4_SSL(imaplib.IMAP4_SSL):
def getsep(self):
"""Get mailbox separator"""
### yes, ugly but cyradm does it in the same way
### also more realable then calling NAMESPACE
### and it should be also compatibile with other servers
try:
return unquote(self.list(DQUOTE, DQUOTE)[1][0]).split()[1]
except:
return DEFAULT_SEP
def isadmin(self):
### A trick to check if the user is admin or not
### normal users cannot use dump command
try:
res, msg = self._simple_command('DUMP', 'NIL')
if msg[0].lower().find('denied') == -1:
return True
except:
pass
return False
def id(self):
try:
typ, dat = self._simple_command('ID', 'NIL')
res, dat = self._untagged_response(typ, dat, 'ID')
except:
return False, dat[0]
return ok(res), dat[0]
def getannotation(self, mailbox, pattern='*', shared=None):
if shared == None:
typ, dat = self._simple_command('GETANNOTATION', mailbox, quote(pattern), quote('*'))
elif shared:
typ, dat = self._simple_command('GETANNOTATION', mailbox, quote(pattern), quote('value.shared'))
else:
typ, dat = self._simple_command('GETANNOTATION', mailbox, quote(pattern), quote('value.priv'))
return self._untagged_response(typ, dat, 'ANNOTATION')
def setannotation(self, mailbox, desc, value, shared=False):
if value:
value = quote(value)
else:
value = "NIL"
if shared:
typ, dat = self._simple_command('SETANNOTATION', mailbox, quote(desc), "(%s %s)" % (quote('value.shared'), value) )
else:
typ, dat = self._simple_command('SETANNOTATION', mailbox, quote(desc), "(%s %s)" % (quote('value.priv'), value) )
return self._untagged_response(typ, dat, 'ANNOTATION')
def setquota(self, mailbox, limit):
"""Set quota of a mailbox"""
if limit == 0:
quota = '()'
else:
quota = '(STORAGE %s)' % limit
return self._simple_command('SETQUOTA', mailbox, quota)
### Overridden to support partition
### Pychecker will complain about non matching signature
def create(self, mailbox, partition=None):
"""Create a mailbox, partition is optional"""
if partition is not None:
return self._simple_command('CREATE', mailbox, partition)
else:
return self._simple_command('CREATE', mailbox)
### Overridden to support partition
### Pychecker: same here
def rename(self, from_mailbox, to_mailbox, partition=None):
"""Rename a from_mailbox to to_mailbox, partition is optional"""
if partition is not None:
return self._simple_command('RENAME', from_mailbox, to_mailbox, partition)
else:
return self._simple_command('RENAME', from_mailbox, to_mailbox)
def reconstruct(self, mailbox):
return self._simple_command('RECONSTRUCT', mailbox)
def login_plain(self, admin, password, asUser):
if asUser:
encoded = b2a_base64("%s\0%s\0%s" % (asUser, admin, password)).strip()
else:
encoded = b2a_base64("%s\0%s\0%s" % (admin, admin, password)).strip()
res, data = self._simple_command('AUTHENTICATE', 'PLAIN', encoded)
if ok(res):
self.state = 'AUTH'
return res, data
class CYRUS:
ERROR = {}
ERROR["CONNECT"] = [0, "Connection error"]
ERROR["INVALID_URL"] = [1, "Invalid URL"]
ERROR["ENCODING"] = [3, "Invalid encondig"]
ERROR["MBXNULL"] = [5, "Mailbox is Null"]
ERROR["NOAUTH"] = [7, "Connection is not authenticated"]
ERROR["LOGIN"] = [10, "User or password is wrong"]
ERROR["ADMIN"] = [11, "User is not cyrus administrator"]
ERROR["AUTH"] = [12, "Connection already authenticated"]
ERROR["LOGINPLAIN"] = [15, "Encryption needed to use mechanism"]
ERROR["LOGIN_PLAIN"] = [16, "User or password is wrong"]
ERROR["CREATE"] = [20, "Unable create mailbox"]
ERROR["DELETE"] = [25, "Unable delete mailbox"]
ERROR["GETACL"] = [30, "Unable parse GETACL result"]
ERROR["SETQUOTA"] = [40, "Invalid integer argument"]
ERROR["GETQUOTA"] = [45, "Quota root does not exist"]
ERROR["RENAME"] = [50, "Unable rename mailbox"]
ERROR["RECONSTRUCT"] = [60, "Unable reconstruct mailbox"]
ERROR["SUBSCRIBE"] = [70, "User is cyrus administrator, normal user required"]
ERROR["UNSUBSCRIBE"] = [75, "User is cyrus administrator, normal user required"]
ERROR["LSUB"] = [77, "User is cyrus administrator, normal user required"]
ERROR["UNKCMD"] = [98, "Command not implemented"]
ERROR["IMAPLIB"] = [99, "Generic imaplib error"]
ENCODING_LIST = ['imap', 'utf-8', 'iso-8859-1']
def __init__(self, url = 'imap://localhost:143'):
self.VERBOSE = False
self.AUTH = False
self.ADMIN = None
self.AUSER = None
self.ADMINACL = 'c'
self.SEP = DEFAULT_SEP
self.ENCODING = 'imap'
self.LOGFD = stdout
match = re_url.match(url)
if match:
host = match.group(2)
if match.group(3):
port = int(match.group(3))
else:
port = 143
else:
self.__doraise("INVALID_URL")
try:
if match.group(1) == 'imap':
self.ssl = False
self.m = IMAP4(host, port)
else:
self.ssl = True
self.m = IMAP4_SSL(host, port)
except:
self.__doraise("CONNECT")
def __del__(self):
if self.AUTH:
self.logout()
def __verbose(self, msg):
if self.VERBOSE:
print >> self.LOGFD, msg
def __doexception(self, function, msg=None, *args):
if msg is None:
try:
msg = self.ERROR.get(function.upper())[1]
except:
msg = self.ERROR.get("IMAPLIB")[1]
value = ""
for arg in args:
if arg is not None:
value = "%s %s" % (value, arg)
self.__verbose( '[%s%s] %s: %s' % (function.upper(), value, "BAD", msg) )
self.__doraise( function.upper(), msg )
def __doraise(self, mode, msg=None):
idError = self.ERROR.get(mode)
if idError:
if msg is None:
msg = idError[1]
else:
idError = [self.ERROR.get("IMAPLIB")[0]]
raise CYRUSError( idError[0], mode, msg )
def __prepare(self, command, mailbox=True):
if not self.AUTH:
self.__doexception(command, self.ERROR.get("NOAUTH")[1])
elif not mailbox:
self.__doexception(command, self.ERROR.get("MBXNULL")[1])
def __docommand(self, function, *args):
wrapped = getattr(self.m, function, None)
if wrapped is None:
raise self.__doraise("UNKCMD")
try:
res, msg = wrapped(*args)
if ok(res):
return res, msg
except Exception, info:
error = info.args[0].split(':').pop().strip()
if error.upper().startswith('BAD'):
error = error.split('BAD', 1).pop().strip()
error = unquote(error[1:-1], '\'')
self.__doexception(function, error, *args)
self.__doexception(function, msg[0], *args)
def xfer(self, mailbox, server):
"""Xfer a mailbox to server"""
return self.m._simple_command('XFER', mailbox, server)
def id(self):
self.__prepare('id')
res, data = self.m.id()
data = data.strip()
if not res or (len(data) < 3): return False, {}
data = data[1:-1] # Strip ()
res, rdata = res2dict(data)
if not res:
self.__verbose( '[ID] Umatched pairs in result' )
return res, rdata
def login(self, username, password):
if self.AUTH:
self.__doexception("LOGIN", self.ERROR.get("AUTH")[1])
try:
res, msg = self.m.login(username, password)
admin = self.m.isadmin()
except Exception, info:
error = info.args[0].split(':').pop().strip()
self.__doexception("LOGIN", error)
if admin:
self.ADMIN = username
self.SEP = self.m.getsep()
self.AUTH = True
self.__verbose( '[LOGIN %s] %s: %s' % (username, res, msg[0]) )
def login_plain(self, username, password, asUser = None):
if self.AUTH:
self.__doexception("LOGINPLAIN", self.ERROR.get("AUTH")[1])
if not self.ssl:
self.__doexception("LOGINPLAIN", self.ERROR.get("LOGINPLAIN")[1])
res, msg = self.__docommand("login_plain", username, password, asUser)
self.__verbose( '[AUTHENTICATE PLAIN %s] %s: %s' % (username, res, msg[0]) )
if ok(res):
if asUser is None:
if self.m.isadmin():
self.ADMIN = admin
else:
self.ADMIN = asUser
self.AUSER = asUser
self.SEP = self.m.getsep()
self.AUTH = True
def logout(self):
try:
res, msg = self.m.logout()
except Exception, info:
error = info.args[0].split(':').pop().strip()
self.__doexception("LOGOUT", error)
self.AUTH = False
self.ADMIN = None
self.AUSER = None
self.__verbose( '[LOGOUT] %s: %s' % (res, msg[0]) )
def getEncoding(self):
"""Get current input/ouput codification"""
return self.ENCODING
def setEncoding(self, enc = None):
"""Set current input/ouput codification"""
if enc is None:
self.ENCODING = 'imap'
elif enc in self.ENCODING_LIST:
self.ENCODING = enc
else:
raise self.__doraise("ENCODING")
def __encode(self, text):
if re.search("&", text):
text = re.sub("/", "+AC8-", text)
text = re.sub("&", "+", text)
text = unicode(text, 'utf-7').encode(self.ENCODING)
return text
def encode(self, text):
if self.ENCODING == 'imap':
return text
elif self.ENCODING in self.ENCODING_LIST:
return self.__encode(text)
def __decode(self, text):
text = re.sub("/", "-&", text)
text = re.sub(" ", "-@", text)
text = unicode(text, self.ENCODING).encode('utf-7')
text = re.sub("-@", " ", text)
text = re.sub("-&", "/", text)
text = re.sub("\+", "&", text)
return text
def decode(self, text):
if self.ENCODING == 'imap':
return text
elif self.ENCODING in self.ENCODING_LIST:
return self.__decode(text)
def lm(self, pattern="*"):
"""
List mailboxes, returns dict with list of mailboxes
To list all mailboxes lm()
To list users top mailboxes lm("user/%")
To list all users mailboxes lm("user/*")
To list users mailboxes startwith a word lm("user/word*")
To list global top folders lm("%")
To list global startwith a word unsupported by server
suggestion lm("word*")
"""
self.__prepare('LIST')
if pattern == '': pattern = "*"
if pattern == '%':
res, ml = self.__docommand('list', '', '%')
else:
res, ml = self.__docommand('list', '""', self.decode(pattern))
if not ok(res):
self.__verbose( '[LIST] %s: %s' % (res, ml) )
return []
if (len(ml) == 1) and ml[0] is None:
self.__verbose( '[LIST] No results' )
return []
mb = []
for mailbox in ml:
res = re_mb.match(mailbox)
if res is None: continue
mbe = unquote(res.group(2))
if 'Noselect' in getflags(res.group(1)): continue
mb.append(self.encode(mbe))
return mb
def cm(self, mailbox, partition=None):
"""Create mailbox"""
self.__prepare('CREATE', mailbox)
res, msg = self.__docommand('create', self.decode(mailbox), partition)
self.__verbose( '[CREATE %s partition=%s] %s: %s' % (mailbox, partition, res, msg[0]) )
def __dm(self, mailbox):
if not mailbox:
return True
self.__docommand("setacl", self.decode(mailbox), self.ADMIN, self.ADMINACL)
res, msg = self.__docommand("delete", self.decode(mailbox))
self.__verbose( '[DELETE %s] %s: %s' % (mailbox, res, msg[0]) )
def dm(self, mailbox, recursive=True):
"""Delete mailbox"""
self.__prepare('DELETE', mailbox)
mbxTmp = mailbox.split(self.SEP)
# Cyrus is not recursive for user subfolders and global folders
if (recursive and mbxTmp[0] != "user") or (len(mbxTmp) > 2):
mbxList = self.lm("%s%s*" % (mailbox, self.SEP))
mbxList.reverse()
for mbox in mbxList:
self.__dm(mbox)
self.__dm(mailbox)
def rename(self, fromMbx, toMbx, partition=None):
"""Rename or change partition"""
self.__prepare('RENAME', fromMbx)
# Rename is recursive! Amen!
res, msg = self.__docommand("rename", self.decode(fromMbx), self.decode(toMbx), partition)
self.__verbose( '[RENAME %s %s] %s: %s' % (fromMbx, toMbx, res, msg[0]) )
def lam(self, mailbox):
"""List ACLs"""
self.__prepare('GETACL', mailbox)
res, acl = self.__docommand("getacl", self.decode(mailbox))
acls = {}
aclList = splitquote(acl.pop().strip())
del aclList[0] # mailbox
for i in range(0, len(aclList), 2):
try:
userid = self.encode(aclList[i])
rights = aclList[i + 1]
except Exception, info:
self.__verbose( '[GETACL %s] BAD: %s' % (mailbox, info.args[0]) )
raise self.__doraise("GETACL")
self.__verbose( '[GETACL %s] %s %s' % (mailbox, userid, rights) )
acls[userid] = rights
return acls
def sam(self, mailbox, userid, rights):
"""Set ACL"""
self.__prepare('SETACL', mailbox)
res, msg = self.__docommand("setacl", self.decode(mailbox), userid, rights)
self.__verbose( '[SETACL %s %s %s] %s: %s' % (mailbox, userid, rights, res, msg[0]) )
def lq(self, mailbox):
"""List Quota"""
self.__prepare('GETQUOTA', mailbox)
res, msg = self.__docommand("getquota", self.decode(mailbox))
match = re_q0.match(msg[0])
if match:
self.__verbose( '[GETQUOTA %s] QUOTA (Unlimited)' % mailbox )
return None, None
match = re_q.match(msg[0])
if match is None:
self.__verbose( '[GETQUOTA %s] BAD: RegExp not matched, please report' % mailbox )
return None, None
try:
used = int(match.group(2))
quota = int(match.group(3))
self.__verbose( '[GETQUOTA %s] %s: QUOTA (%d/%d)' % (mailbox, res, used, quota) )
return used, quota
except:
self.__verbose( '[GETQUOTA %s] BAD: Error while parsing results' % mailbox )
return None, None
def lqr(self, mailbox):
"""List Quota Root"""
self.__prepare('GETQUOTAROOT', mailbox)
res, msg = self.__docommand("getquotaroot", self.decode(mailbox))
(_mailbox, _root) = msg[0][0].split()
match = re_q0.match(msg[1][0])
if match:
self.__verbose( '[GETQUOTAROOT %s] QUOTAROOT (Unlimited)' % mailbox )
return _root, None, None
match = re_q.match(msg[1][0])
try:
used = int(match.group(2))
quota = int(match.group(3))
self.__verbose( '[GETQUOTAROOT %s] %s: QUOTA (%d/%d)' % (mailbox, res, used, quota) )
return _root, used, quota
except:
self.__verbose( '[GETQUOTAROOT %s] BAD: Error while parsing results' % mailbox )
return _root, None, None
def sq(self, mailbox, limit):
"""Set Quota"""
self.__prepare('SETQUOTA', mailbox)
try:
limit = int(limit)
except ValueError, e:
self.__verbose( '[SETQUOTA %s] BAD: %s %s' % (mailbox, self.ERROR.get("SETQUOTA")[1], limit) )
raise self.__doraise("SETQUOTA")
res, msg = self.__docommand("setquota", self.decode(mailbox), limit)
self.__verbose( '[SETQUOTA %s %s] %s: %s' % (mailbox, limit, res, msg[0]) )
def getannotation(self, mailbox, pattern='*'):
"""Get Annotation"""
self.__prepare('GETANNOTATION')
res, data = self.__docommand('getannotation', self.decode(mailbox), pattern)
if (len(data) == 1) and data[0] is None:
self.__verbose( '[GETANNOTATION %s] No results' % (mailbox) )
return {}
ann = {}
for annotation in data:
self.__verbose( '[GETANNOTATION] RAW %r (length %d)' % (annotation,len(annotation)))
annotation = annotation.split('"')
self.__verbose( '[GETANNOTATION] SPLIT %r (length %d)' % (annotation,len(annotation)))
if not len(annotation) in [ 9, 17, 21, 37 ]:
self.__verbose( '[GETANNOTATION] Invalid annotation entry' )
continue
mbx = self.encode(annotation[1])
_key = annotation[3]
if annotation[5] == "value.shared":
key = "/shared%s" % (_key)
elif annotation[5] == "value.priv":
key = "/private%s" % (_key)
value = annotation[7]
self.__verbose( '[GETANNOTATION %s] %s: %s' % (mbx, key, value) )
if not ann.has_key(mbx):
ann[mbx] = {}
if not ann[mbx].has_key(key):
ann[mbx][key] = value
if len(annotation) > 21:
# There's another one hidden in here.
if annotation[21] == "value.shared":
key = "/shared%s" % (_key)
elif annotation[21] == "value.priv":
key = "/private%s" % (_key)
value = annotation[23]
self.__verbose( '[GETANNOTATION %s] %s: %s' % (mbx, key, value) )
if not ann.has_key(mbx):
ann[mbx] = {}
if not ann[mbx].has_key(key):
ann[mbx][key] = value
return ann
def setannotation(self, mailbox, annotation, value, shared=False):
"""Set Annotation"""
self.__prepare('SETANNOTATION')
res, msg = self.__docommand("setannotation", self.decode(mailbox), annotation, value, shared)
self.__verbose( '[SETANNOTATION %s] %s: %s' % (mailbox, res, msg[0]) )
def __reconstruct(self, mailbox):
if not mailbox:
return True
res, msg = self.__docommand("reconstruct", self.decode(mailbox))
self.__verbose( '[RECONSTRUCT %s] %s: %s' % (mailbox, res, msg[0]) )
def reconstruct(self, mailbox, recursive=True):
"""Reconstruct"""
self.__prepare('RECONSTRUCT', mailbox)
# Cyrus is not recursive for remote reconstruct
if recursive:
mbxList = self.lm("%s%s*" % (mailbox, self.SEP))
mbxList.reverse()
for mbox in mbxList:
self.__reconstruct(mbox)
self.__reconstruct(mailbox)
def lsub(self, pattern="*"):
if self.AUSER is None:
self.__doexception("lsub")
self.__prepare('LSUB')
if pattern == '': pattern = "*"
res, ml = self.__docommand('lsub', '*', pattern)
if not ok(res):
self.__verbose( '[LIST] %s: %s' % (res, ml) )
return []
if (len(ml) == 1) and ml[0] is None:
self.__verbose( '[LIST] No results' )
return []
mb = []
for mailbox in ml:
res = re_mb.match(mailbox)
if res is None: continue
mbe = unquote(res.group(2))
if 'Noselect' in getflags(res.group(1)): continue
mb.append(self.encode(mbe))
return mb
def subscribe(self, mailbox):
"""Subscribe"""
self.__prepare('SUBSCRIBE')
res, msg = self.__docommand("subscribe", self.decode(mailbox))
self.__verbose( '[SUBSCRIBE %s] %s: %s' % (mailbox, res, msg[0]) )
def unsubscribe(self, mailbox):
"""Unsubscribe"""
self.__prepare('UNSUBSCRIBE')
res, msg = self.__docommand("unsubscribe", self.decode(mailbox))
self.__verbose( '[UNSUBSCRIBE %s] %s: %s' % (mailbox, res, msg[0]) )

File Metadata

Mime Type
text/x-script.python
Expires
Mon, Apr 6, 1:27 AM (1 d, 20 h ago)
Storage Engine
local-disk
Storage Format
Raw Data
Storage Handle
37/fa/b038c8b25db1e08cdc2ea6e417e8
Default Alt Text
cyruslib.py (26 KB)

Event Timeline