Page Menu
Home
Phorge
Search
Configure Global Search
Log In
Files
F117875491
D4051.1775334234.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Flag For Later
Award Token
Authored By
Unknown
Size
21 KB
Referenced Files
None
Subscribers
None
D4051.1775334234.diff
View Options
diff --git a/cyruslib.py b/cyruslib.py
--- a/cyruslib.py
+++ b/cyruslib.py
@@ -48,6 +48,8 @@
'ID' : ('AUTH',), # Only one ID allowed in non auth mode
'GETANNOTATION': ('AUTH',),
'SETANNOTATION': ('AUTH',),
+ 'GETMETADATA': ('AUTH',),
+ 'SETMETADATA': ('AUTH',),
'XFER' : ('AUTH',)
}
@@ -193,6 +195,41 @@
return self._untagged_response(typ, dat, 'ANNOTATION')
+ def getmetadata(self, mailbox, pattern='*', shared=None):
+ # If pattern is '*' clean pattern and search all entries under /shared
+ # and/or /private (depens on the shared parameter value) to emulate the
+ # ANNOTATEMORE behaviour
+ if pattern == '*':
+ pattern = ''
+ options = '(DEPTH infinity)'
+ else:
+ options = '(DEPTH 0)'
+ if shared == None:
+ entries = '( /shared%s /private%s )' % (pattern, pattern)
+ elif shared:
+ entries = "/shared%s" % pattern
+ else:
+ entries = " /private%s" % pattern
+
+ typ, dat = self._simple_command('GETMETADATA', options, mailbox, entries)
+
+ return self._untagged_response(typ, dat, 'METADATA')
+
+ def setmetadata(self, mailbox, desc, value, shared=False):
+ if value:
+ value = value.join(['"', '"'])
+ else:
+ value = "NIL"
+
+ if shared:
+ typ, dat = self._simple_command('SETMETADATA', mailbox,
+ "(/shared%s %s)" % (desc,value))
+ else:
+ typ, dat = self._simple_command('SETMETADATA', mailbox,
+ "(/private%s %s)" % (desc,value))
+
+ return self._untagged_response(typ, dat, 'METADATA')
+
def setquota(self, mailbox, limit):
"""Set quota of a mailbox"""
if limit == 0:
@@ -281,6 +318,41 @@
return self._untagged_response(typ, dat, 'ANNOTATION')
+ def getmetadata(self, mailbox, pattern='*', shared=None):
+ # If pattern is '*' clean pattern and search all entries under /shared
+ # and/or /private (depens on the shared parameter value) to emulate the
+ # ANNOTATEMORE behaviour
+ if pattern == '*':
+ pattern = ''
+ options = '(DEPTH infinity)'
+ else:
+ options = '(DEPTH 0)'
+ if shared == None:
+ entries = '( /shared%s /private%s )' % (pattern, pattern)
+ elif shared:
+ entries = "/shared%s" % pattern
+ else:
+ entries = " /private%s" % pattern
+
+ typ, dat = self._simple_command('GETMETADATA', options, mailbox, entries)
+
+ return self._untagged_response(typ, dat, 'METADATA')
+
+ def setmetadata(self, mailbox, desc, value, shared=False):
+ if value:
+ value = value.join(['"', '"'])
+ else:
+ value = "NIL"
+
+ if shared:
+ typ, dat = self._simple_command('SETMETADATA', mailbox,
+ "(/shared%s %s)" % (desc,value))
+ else:
+ typ, dat = self._simple_command('SETMETADATA', mailbox,
+ "(/private%s %s)" % (desc,value))
+
+ return self._untagged_response(typ, dat, 'METADATA')
+
def setquota(self, mailbox, limit):
"""Set quota of a mailbox"""
if limit == 0:
@@ -695,140 +767,116 @@
res, msg = self.__docommand("setquota", self.decode(mailbox), limit)
self.__verbose( '[SETQUOTA %s %s] %s: %s' % (mailbox, limit, res, msg[0]) )
- def getannotation(self, mailbox, pattern='*'):
- """Get Annotation"""
- self.__prepare('GETANNOTATION')
- res, data = self.__docommand('getannotation', self.decode(mailbox), pattern)
-
- if (len(data) == 1) and data[0] is None:
- self.__verbose( '[GETANNOTATION %s] No results' % (mailbox) )
- return {}
-
- ann = {}
- annotations = []
- empty_values = [ "NIL", '" "', None, '', ' ' ]
-
- concat_items = []
- for item in data:
- if isinstance(item, tuple):
- item = ' '.join([str(x) for x in item])
- if len(concat_items) > 0:
- concat_items.append(item)
- if ''.join(concat_items).count('(') == ''.join(concat_items).count(')'):
- annotations.append(''.join(concat_items))
- concat_items = []
- continue
- else:
-
- if item.count('(') == item.count(')'):
- annotations.append(item)
- continue
- else:
- concat_items.append(item)
- continue
+ def getmetadata(self, mailbox, pattern='*', shared=None):
+ """Get Metadata"""
+ # This test needs to be reviewed
+ #if not self.metadata:
+ # return {}
- for annotation in annotations:
- annotation = annotation.strip()
-
- if not annotation[0] == '"':
- folder = annotation.split('"')[0].replace('"','').strip()
- key = annotation.split('"')[1].replace('"','').replace("'","").strip()
- _annot = annotation.split('(')[1].split(')')[0].strip()
- else:
- folder = annotation.split('"')[1].replace('"','').strip()
- key = annotation.split('"')[3].replace('"','').replace("'","").strip()
- _annot = annotation.split('(')[1].split(')')[0].strip()
-
- if folder not in ann:
- ann[folder] = {}
-
- try:
- value_priv = _annot[(_annot.index('"value.priv"')+len('"value.priv"')):_annot.index('"size.priv"')].strip()
- except ValueError:
- value_priv = None
+ # Annotations vs. Metadata fix ... we set a pattern that we know is
+ # good enough for our purposes for now, but the fact is that the
+ # calling programs should be fixed instead.
- try:
- size_priv = _annot[(_annot.index('"size.priv"')+len('"size.priv"')):].strip().split('"')[1].strip()
- try:
- value_priv = value_priv[value_priv.index('{%s}' % (size_priv))+len('{%s}' % (size_priv)):].strip()
- except Exception:
- pass
- except Exception:
- pass
-
- if value_priv in empty_values:
- value_priv = None
- else:
- try:
- value_priv = value_priv[:value_priv.index('"content-type.priv"')].strip()
- except:
- pass
+ res, data = self.__docommand("getmetadata", self.decode(mailbox), pattern, shared)
- try:
- value_priv = value_priv[:value_priv.index('"modifiedsince.priv"')].strip()
- except:
- pass
+ if (len(data) == 1) and data[0] is None:
+ self.__verbose( '[GETMETADATA %s] No results' % (mailbox) )
+ return {}
- if value_priv.startswith('"'):
- value_priv = value_priv[1:]
+ # Get the first response line (it can be a string or a tuple)
+ if isinstance(data[0], tuple):
+ fline = data[0][0]
+ else:
+ fline = data[0]
+
+ # Find the folder name
+ fbeg = 0
+ fend = -1
+ if fline[0] == '"':
+ # Quoted name
+ fbeg = 1
+ i = 1
+ while i < len(fline):
+ if fline[i] == '"':
+ # folder name ended unless the previous char is \ (we
+ # should test more, this test would fail if we had a \
+ # at the end of the folder name, but we leave it at that
+ # right now
+ if fline[i-1] != '\\':
+ fend = i
+ break
+ i += 1
+ else:
+ # For unquoted names the first word is the folder name
+ fend = fline.find(' ')
- if value_priv.endswith('"'):
- value_priv = value_priv[:-1]
+ # No mailbox found
+ if fend < 0:
+ self.__verbose( '[GETMETADATA %s] Mailbox not found in results' % (mailbox) )
+ return {}
- if value_priv in empty_values:
- value_priv = None
+ # Folder name
+ folder = fline[fbeg:fend]
+
+ # Check mailbox name against the folder name
+ if folder != mailbox:
+ quoted_mailbox = "\"%s\"" % (mailbox)
+ if folder != quoted_mailbox:
+ self.__verbose(
+ '[GETMETADATA %s] Mailbox \'%s\' is not the same as \'%s\'' \
+ % (mailbox, quoted_mailbox, folder)
+ )
+ return {}
+
+ # Process the rest of the first line, the first value will be
+ # available after the first '(' found
+ i=fend
+ ebeg = -1
+ while i < len(fline):
+ if fline[i] == '(':
+ ebeg = i+1
+ break
+ i += 1
+
+ if ebeg < 0:
+ self.__verbose(
+ '[GETMETADATA %s] Mailbox has no values, skipping' % (mailbox)
+ )
+ return {}
- try:
- value_shared = _annot[(_annot.index('"value.shared"')+len('"value.shared"')):_annot.index('"size.shared"')].strip()
- except ValueError:
- value_shared = None
+ # This variable will start with an entry name and will continue with
+ # the value length or the value
+ nfline = fline[ebeg:]
+ if isinstance(data[0], tuple):
+ entries = _get_line_entries((nfline,) + data[0][1:])
+ else:
+ entries = _get_line_entries((nfline,))
- try:
- size_shared = _annot[(_annot.index('"size.shared"')+len('"size.shared"')):].strip().split('"')[1].strip()
- try:
- value_shared = value_shared[value_shared.index('{%s}' % (size_shared))+len('{%s}' % (size_shared)):].strip()
- except Exception:
- pass
- except Exception:
- pass
-
- if value_shared in empty_values:
- value_shared = None
+ for line in data[1:]:
+ if isinstance(line, tuple):
+ lentries = _get_line_entries(line)
else:
- try:
- value_shared = value_shared[:value_shared.index('"content-type.shared"')].strip()
- except:
- pass
-
- try:
- value_shared = value_shared[:value_shared.index('"modifiedsince.shared"')].strip()
- except:
- pass
-
- if value_shared.startswith('"'):
- value_shared = value_shared[1:]
-
- if value_shared.endswith('"'):
- value_shared = value_shared[:-1]
+ lentries = _get_line_entries([line,])
- if value_shared in empty_values:
- value_shared = None
+ if lentries != None and lentries != {}:
+ entries.update(lentries)
- if not value_priv == None:
- ann[folder]['/private' + key] = value_priv
+ mdat = { mailbox: entries };
+ return mdat
- if not value_shared == None:
- ann[folder]['/shared' + key] = value_shared
+ def setmetadata(self, mailbox, desc, value, shared=False):
+ """Set METADADATA"""
+ res, msg = self.__docommand("setmetadata", self.decode(mailbox), desc, value, shared)
+ self.__verbose( '[SETMETADATA %s] %s: %s' % (mailbox, res, msg[0]) )
- return ann
+ # Backwards compat
+ # def getannotation(self, *args, **kw):
+ # return self._getmetadata(*args, **kw)
- def setannotation(self, mailbox, annotation, value, shared=False):
- """Set Annotation"""
- self.__prepare('SETANNOTATION')
- res, msg = self.__docommand("setannotation", self.decode(mailbox), annotation, value, shared)
- self.__verbose( '[SETANNOTATION %s] %s: %s' % (mailbox, res, msg[0]) )
+ # def setannotation(self, *args, **kw):
+ # return self._setmetadata(*args, **kw)
def __reconstruct(self, mailbox):
if not mailbox:
diff --git a/pykolab/imap/cyrus.py b/pykolab/imap/cyrus.py
--- a/pykolab/imap/cyrus.py
+++ b/pykolab/imap/cyrus.py
@@ -246,7 +246,7 @@
while 1:
num_try += 1
- annotations = self._getannotation(
+ annotations = self._getmetadata(
'"%s"' % (mailfolder),
ann_path
)
@@ -363,12 +363,12 @@
partition
)
- def _getannotation(self, *args, **kw):
- return self.getannotation(*args, **kw)
+ def _getmetadata(self, *args, **kw):
+ return self.getmetadata(*args, **kw)
- def _setannotation(self, mailfolder, annotation, value, shared=False):
+ def _setmetadata(self, mailfolder, metadata, value, shared=False):
"""
- Login to the actual backend server, then set annotation.
+ Login to the actual backend server, then set metadata.
"""
try:
server = self.find_mailfolder_server(mailfolder)
@@ -376,24 +376,32 @@
server = self.server
log.debug(
- _("Setting annotation %s on folder %s") % (
- annotation,
+ _("Setting metadata %s on folder %s") % (
+ metadata,
mailfolder
),
level=8
)
try:
- self.setannotation(mailfolder, annotation, value, shared)
+ self.setmetadata(mailfolder, metadata, value, shared)
except cyruslib.CYRUSError as errmsg:
log.error(
- _("Could not set annotation %r on mail folder %r: %r") % (
- annotation,
+ _("Could not set metadata %r on mail folder %r: %r") % (
+ metadata,
mailfolder,
errmsg
)
)
+ # Use metadata instead of annotations
+ def _getannotation(self, *args, **kw):
+ return self._getmetadata(*args, **kw)
+
+ # Use metadata instead of annotations
+ def _setannotation(self, *args, **kw):
+ return self._setmetadata(*args, **kw)
+
def _xfer(self, mailfolder, current_server, new_server):
self.connect(self.uri.replace(self.server, current_server))
log.debug(
diff --git a/pykolab/imap/dovecot.py b/pykolab/imap/dovecot.py
--- a/pykolab/imap/dovecot.py
+++ b/pykolab/imap/dovecot.py
@@ -57,59 +57,6 @@
log = pykolab.getLogger('pykolab.imap')
conf = pykolab.getConf()
-# BEG: Add GETMETADATA and SETMETADATA support to the cyruslib IMAP objects
-
-Commands = {
- 'GETMETADATA': ('AUTH',),
- 'SETMETADATA': ('AUTH',),
-}
-
-imaplib.Commands.update(Commands)
-
-def imap_getmetadata(self, mailbox, pattern='*', shared=None):
- # If pattern is '*' clean pattern and search all entries under /shared
- # and/or /private (depens on the shared parameter value) to emulate the
- # ANNOTATEMORE behaviour
- if pattern == '*':
- pattern = ''
- options = '(DEPTH infinity)'
- else:
- options = '(DEPTH 0)'
- if shared == None:
- entries = '( /shared%s /private%s )' % (pattern, pattern)
- elif shared:
- entries = "/shared%s" % pattern
- else:
- entries = " /private%s" % pattern
-
- typ, dat = self._simple_command('GETMETADATA', options, mailbox, entries)
-
- return self._untagged_response(typ, dat, 'METADATA')
-
-def imap_setmetadata(self, mailbox, desc, value, shared=False):
- if value:
- value = value.join(['"', '"'])
- else:
- value = "NIL"
-
- if shared:
- typ, dat = self._simple_command('SETMETADATA', mailbox,
- "(/shared%s %s)" % (desc,value))
- else:
- typ, dat = self._simple_command('SETMETADATA', mailbox,
- "(/private%s %s)" % (desc,value))
-
- return self._untagged_response(typ, dat, 'METADATA')
-
-# Bind the new methods to the cyruslib IMAP4 and IMAP4_SSL objects
-from types import MethodType
-cyruslib.IMAP4.getmetadata = MethodType(imap_getmetadata, None, cyruslib.IMAP4)
-cyruslib.IMAP4.setmetadata = MethodType(imap_setmetadata, None, cyruslib.IMAP4)
-cyruslib.IMAP4_SSL.getmetadata = MethodType(imap_getmetadata, None, cyruslib.IMAP4_SSL)
-cyruslib.IMAP4_SSL.setmetadata = MethodType(imap_setmetadata, None, cyruslib.IMAP4_SSL)
-
-# END: Add GETMETADATA and SETMETADATA support to the cyruslib IMAP objects
-
# Auxiliary functions
def _get_line_entries(lines):
"""Function to get metadata entries """
@@ -330,126 +277,15 @@
self.m.rename(self.folder_utf7(from_mailfolder), self.folder_utf7(to_mailfolder), '"%s"' % (partition))
-# BEG: METADATA support functions ... quite similar to annotations, really
-
- def _getmetadata(self, mailbox, pattern='*', shared=None):
- """Get Metadata"""
- # This test needs to be reviewed
- #if not self.metadata:
- # return {}
-
- # Annotations vs. Metadata fix ... we set a pattern that we know is
- # good enough for our purposes for now, but the fact is that the
- # calling programs should be fixed instead.
-
- res, data = self.m.getmetadata(self.decode(mailbox), pattern, shared)
-
- if (len(data) == 1) and data[0] is None:
- self.__verbose( '[GETMETADATA %s] No results' % (mailbox) )
- return {}
-
- # Get the first response line (it can be a string or a tuple)
- if isinstance(data[0], tuple):
- fline = data[0][0]
- else:
- fline = data[0]
-
- # Find the folder name
- fbeg = 0
- fend = -1
- if fline[0] == '"':
- # Quoted name
- fbeg = 1
- i = 1
- while i < len(fline):
- if fline[i] == '"':
- # folder name ended unless the previous char is \ (we
- # should test more, this test would fail if we had a \
- # at the end of the folder name, but we leave it at that
- # right now
- if fline[i-1] != '\\':
- fend = i
- break
- i += 1
- else:
- # For unquoted names the first word is the folder name
- fend = fline.find(' ')
-
- # No mailbox found
- if fend < 0:
- self.__verbose( '[GETMETADATA %s] Mailbox not found in results' % (mailbox) )
- return {}
-
- # Folder name
- folder = fline[fbeg:fend]
-
- # Check mailbox name against the folder name
- if folder != mailbox:
- quoted_mailbox = "\"%s\"" % (mailbox)
- if folder != quoted_mailbox:
- self.__verbose(
- '[GETMETADATA %s] Mailbox \'%s\' is not the same as \'%s\'' \
- % (mailbox, quoted_mailbox, folder)
- )
- return {}
-
- # Process the rest of the first line, the first value will be
- # available after the first '(' found
- i=fend
- ebeg = -1
- while i < len(fline):
- if fline[i] == '(':
- ebeg = i+1
- break
- i += 1
-
- if ebeg < 0:
- self.__verbose(
- '[GETMETADATA %s] Mailbox has no values, skipping' % (mailbox)
- )
- return {}
-
- # This variable will start with an entry name and will continue with
- # the value lenght or the value
- nfline = fline[ebeg:]
- if isinstance(data[0], tuple):
- entries = _get_line_entries((nfline,) + data[0][1:])
- else:
- entries = _get_line_entries((nfline,))
-
- for line in data[1:]:
- if isinstance(line, tuple):
- lentries = _get_line_entries(line)
- else:
- lentries = _get_line_entries([line,])
-
- if lentries != None and lentries != {}:
- entries.update(lentries)
-
- mdat = { mailbox: entries };
- return mdat
-
- def _setmetadata(self, mailbox, desc, value, shared=False):
- """Set METADADATA"""
- res, msg = self.m.setmetadata(self.decode(mailbox), desc, value, shared)
- self.__verbose( '[SETMETADATA %s] %s: %s' % (mailbox, res, msg[0]) )
# Use metadata instead of annotations
def _getannotation(self, *args, **kw):
return self._getmetadata(*args, **kw)
- def getannotation(self, *args, **kw):
- return self._getmetadata(*args, **kw)
-
# Use metadata instead of annotations
def _setannotation(self, *args, **kw):
return self._setmetadata(*args, **kw)
- def setannotation(self, *args, **kw):
- return self._setmetadata(*args, **kw)
-
-# END: METADATA / Annotations
-
# The functions that follow are the same ones used with Cyrus, probably a
# review is needed
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Sat, Apr 4, 8:23 PM (8 h, 33 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
18831066
Default Alt Text
D4051.1775334234.diff (21 KB)
Attached To
Mode
D4051: METADATA support
Attached
Detach File
Event Timeline