Changeset View
Changeset View
Standalone View
Standalone View
pykolab/imap/dovecot.py
Show First 20 Lines • Show All 51 Lines • ▼ Show 20 Lines | |||||
import pykolab | import pykolab | ||||
from pykolab.imap import IMAP | from pykolab.imap import IMAP | ||||
from pykolab.translate import _ | from pykolab.translate import _ | ||||
log = pykolab.getLogger('pykolab.imap') | log = pykolab.getLogger('pykolab.imap') | ||||
conf = pykolab.getConf() | conf = pykolab.getConf() | ||||
# BEG: Add GETMETADATA and SETMETADATA support to the cyruslib IMAP objects | |||||
Commands = { | |||||
'GETMETADATA': ('AUTH',), | |||||
'SETMETADATA': ('AUTH',), | |||||
} | |||||
imaplib.Commands.update(Commands) | |||||
def imap_getmetadata(self, mailbox, pattern='*', shared=None): | |||||
# If pattern is '*' clean pattern and search all entries under /shared | |||||
# and/or /private (depens on the shared parameter value) to emulate the | |||||
# ANNOTATEMORE behaviour | |||||
if pattern == '*': | |||||
pattern = '' | |||||
options = '(DEPTH infinity)' | |||||
else: | |||||
options = '(DEPTH 0)' | |||||
if shared == None: | |||||
entries = '( /shared%s /private%s )' % (pattern, pattern) | |||||
elif shared: | |||||
entries = "/shared%s" % pattern | |||||
else: | |||||
entries = " /private%s" % pattern | |||||
typ, dat = self._simple_command('GETMETADATA', options, mailbox, entries) | |||||
return self._untagged_response(typ, dat, 'METADATA') | |||||
def imap_setmetadata(self, mailbox, desc, value, shared=False): | |||||
if value: | |||||
value = value.join(['"', '"']) | |||||
else: | |||||
value = "NIL" | |||||
if shared: | |||||
typ, dat = self._simple_command('SETMETADATA', mailbox, | |||||
"(/shared%s %s)" % (desc,value)) | |||||
else: | |||||
typ, dat = self._simple_command('SETMETADATA', mailbox, | |||||
"(/private%s %s)" % (desc,value)) | |||||
return self._untagged_response(typ, dat, 'METADATA') | |||||
# Bind the new methods to the cyruslib IMAP4 and IMAP4_SSL objects | |||||
from types import MethodType | |||||
cyruslib.IMAP4.getmetadata = MethodType(imap_getmetadata, None, cyruslib.IMAP4) | |||||
cyruslib.IMAP4.setmetadata = MethodType(imap_setmetadata, None, cyruslib.IMAP4) | |||||
cyruslib.IMAP4_SSL.getmetadata = MethodType(imap_getmetadata, None, cyruslib.IMAP4_SSL) | |||||
cyruslib.IMAP4_SSL.setmetadata = MethodType(imap_setmetadata, None, cyruslib.IMAP4_SSL) | |||||
# END: Add GETMETADATA and SETMETADATA support to the cyruslib IMAP objects | |||||
# Auxiliary functions | |||||
def _get_line_entries(lines): | |||||
"""Function to get metadata entries """ | |||||
entries = {} | |||||
name = None | |||||
value = "" | |||||
vlen = 0 | |||||
for line in lines: | |||||
line_len = len(line) | |||||
i = 0 | |||||
while i < line_len: | |||||
if name == None: | |||||
if line[i] == '/': | |||||
j = i | |||||
while j < line_len: | |||||
if line[j] == ' ': | |||||
break | |||||
j += 1 | |||||
name = line[i:j] | |||||
i = j | |||||
elif vlen != 0: | |||||
j = i + vlen | |||||
if j > line_len: | |||||
value += line[i:line_len] | |||||
vlen -= line_len - i | |||||
else: | |||||
value += line[i:i+vlen] | |||||
if value in ('', 'NIL'): | |||||
entries[name] = "" | |||||
else: | |||||
entries[name] = value | |||||
name = None | |||||
value = "" | |||||
vlen = 0 | |||||
elif line[i] == '{': | |||||
j = i | |||||
while j < line_len: | |||||
if line[j] == '}': | |||||
vlen = int(line[i+1:j]) | |||||
break | |||||
j += 1 | |||||
i = j | |||||
elif line[i] != ' ': | |||||
j = i | |||||
if line[i] == '"': | |||||
while j < line_len: | |||||
# Skip quoted text | |||||
if line[j] == '\\': | |||||
j += 2 | |||||
continue | |||||
elif line[j] == '"': | |||||
break | |||||
j += 1 | |||||
else: | |||||
while j < line_len: | |||||
if line[j] == ' ' or line[j] == ')': | |||||
break | |||||
j += 1 | |||||
value = line[i:j] | |||||
if value in ('', 'NIL'): | |||||
entries[name] = "" | |||||
else: | |||||
entries[name] = value | |||||
name = None | |||||
value = "" | |||||
i = j | |||||
i += 1 | |||||
return entries | |||||
class Dovecot(cyruslib.CYRUS): | class Dovecot(cyruslib.CYRUS): | ||||
""" | """ | ||||
Abstraction class for some common actions to do exclusively in | Abstraction class for some common actions to do exclusively in | ||||
Dovecot. | Dovecot. | ||||
Initially based on the Cyrus driver, will remove dependencies on | Initially based on the Cyrus driver, will remove dependencies on | ||||
cyruslib later; right now this module has only been tested to use the | cyruslib later; right now this module has only been tested to use the | ||||
dovecot metadata support (no quota or folder operations tests have | dovecot metadata support (no quota or folder operations tests have | ||||
▲ Show 20 Lines • Show All 135 Lines • ▼ Show 20 Lines | def _rename(self, from_mailfolder, to_mailfolder, partition=None): | ||||
# Removed server reconnection for dovecot, we only have one server | # Removed server reconnection for dovecot, we only have one server | ||||
if not partition == None: | if not partition == None: | ||||
log.debug(_("Moving INBOX folder %s to %s on partition %s") % (from_mailfolder,to_mailfolder, partition), level=8) | log.debug(_("Moving INBOX folder %s to %s on partition %s") % (from_mailfolder,to_mailfolder, partition), level=8) | ||||
else: | else: | ||||
log.debug(_("Moving INBOX folder %s to %s") % (from_mailfolder,to_mailfolder), level=8) | log.debug(_("Moving INBOX folder %s to %s") % (from_mailfolder,to_mailfolder), level=8) | ||||
self.m.rename(self.folder_utf7(from_mailfolder), self.folder_utf7(to_mailfolder), '"%s"' % (partition)) | self.m.rename(self.folder_utf7(from_mailfolder), self.folder_utf7(to_mailfolder), '"%s"' % (partition)) | ||||
# BEG: METADATA support functions ... quite similar to annotations, really | |||||
def _getmetadata(self, mailbox, pattern='*', shared=None): | |||||
"""Get Metadata""" | |||||
# This test needs to be reviewed | |||||
#if not self.metadata: | |||||
# return {} | |||||
# Annotations vs. Metadata fix ... we set a pattern that we know is | |||||
# good enough for our purposes for now, but the fact is that the | |||||
# calling programs should be fixed instead. | |||||
res, data = self.m.getmetadata(self.decode(mailbox), pattern, shared) | |||||
if (len(data) == 1) and data[0] is None: | |||||
self.__verbose( '[GETMETADATA %s] No results' % (mailbox) ) | |||||
return {} | |||||
# Get the first response line (it can be a string or a tuple) | |||||
if isinstance(data[0], tuple): | |||||
fline = data[0][0] | |||||
else: | |||||
fline = data[0] | |||||
# Find the folder name | |||||
fbeg = 0 | |||||
fend = -1 | |||||
if fline[0] == '"': | |||||
# Quoted name | |||||
fbeg = 1 | |||||
i = 1 | |||||
while i < len(fline): | |||||
if fline[i] == '"': | |||||
# folder name ended unless the previous char is \ (we | |||||
# should test more, this test would fail if we had a \ | |||||
# at the end of the folder name, but we leave it at that | |||||
# right now | |||||
if fline[i-1] != '\\': | |||||
fend = i | |||||
break | |||||
i += 1 | |||||
else: | |||||
# For unquoted names the first word is the folder name | |||||
fend = fline.find(' ') | |||||
# No mailbox found | |||||
if fend < 0: | |||||
self.__verbose( '[GETMETADATA %s] Mailbox not found in results' % (mailbox) ) | |||||
return {} | |||||
# Folder name | |||||
folder = fline[fbeg:fend] | |||||
# Check mailbox name against the folder name | |||||
if folder != mailbox: | |||||
quoted_mailbox = "\"%s\"" % (mailbox) | |||||
if folder != quoted_mailbox: | |||||
self.__verbose( | |||||
'[GETMETADATA %s] Mailbox \'%s\' is not the same as \'%s\'' \ | |||||
% (mailbox, quoted_mailbox, folder) | |||||
) | |||||
return {} | |||||
# Process the rest of the first line, the first value will be | |||||
# available after the first '(' found | |||||
i=fend | |||||
ebeg = -1 | |||||
while i < len(fline): | |||||
if fline[i] == '(': | |||||
ebeg = i+1 | |||||
break | |||||
i += 1 | |||||
if ebeg < 0: | |||||
self.__verbose( | |||||
'[GETMETADATA %s] Mailbox has no values, skipping' % (mailbox) | |||||
) | |||||
return {} | |||||
# This variable will start with an entry name and will continue with | |||||
# the value lenght or the value | |||||
nfline = fline[ebeg:] | |||||
if isinstance(data[0], tuple): | |||||
entries = _get_line_entries((nfline,) + data[0][1:]) | |||||
else: | |||||
entries = _get_line_entries((nfline,)) | |||||
for line in data[1:]: | |||||
if isinstance(line, tuple): | |||||
lentries = _get_line_entries(line) | |||||
else: | |||||
lentries = _get_line_entries([line,]) | |||||
if lentries != None and lentries != {}: | |||||
entries.update(lentries) | |||||
mdat = { mailbox: entries }; | |||||
return mdat | |||||
def _setmetadata(self, mailbox, desc, value, shared=False): | |||||
"""Set METADADATA""" | |||||
res, msg = self.m.setmetadata(self.decode(mailbox), desc, value, shared) | |||||
self.__verbose( '[SETMETADATA %s] %s: %s' % (mailbox, res, msg[0]) ) | |||||
# Use metadata instead of annotations | # Use metadata instead of annotations | ||||
def _getannotation(self, *args, **kw): | def _getannotation(self, *args, **kw): | ||||
return self._getmetadata(*args, **kw) | return self._getmetadata(*args, **kw) | ||||
def getannotation(self, *args, **kw): | |||||
return self._getmetadata(*args, **kw) | |||||
# Use metadata instead of annotations | # Use metadata instead of annotations | ||||
def _setannotation(self, *args, **kw): | def _setannotation(self, *args, **kw): | ||||
return self._setmetadata(*args, **kw) | return self._setmetadata(*args, **kw) | ||||
def setannotation(self, *args, **kw): | |||||
return self._setmetadata(*args, **kw) | |||||
# END: METADATA / Annotations | |||||
# The functions that follow are the same ones used with Cyrus, probably a | # The functions that follow are the same ones used with Cyrus, probably a | ||||
# review is needed | # review is needed | ||||
def _xfer(self, mailfolder, current_server, new_server): | def _xfer(self, mailfolder, current_server, new_server): | ||||
self.connect(self.uri.replace(self.server,current_server)) | self.connect(self.uri.replace(self.server,current_server)) | ||||
log.debug(_("Transferring folder %s from %s to %s") % (mailfolder, current_server, new_server), level=8) | log.debug(_("Transferring folder %s from %s to %s") % (mailfolder, current_server, new_server), level=8) | ||||
self.xfer(mailfolder, new_server) | self.xfer(mailfolder, new_server) | ||||
▲ Show 20 Lines • Show All 179 Lines • Show Last 20 Lines |