diff --git a/pykolab/conf/__init__.py b/pykolab/conf/__init__.py index ca19ac9..eb56bf9 100644 --- a/pykolab/conf/__init__.py +++ b/pykolab/conf/__init__.py @@ -1,799 +1,799 @@ # -*- coding: utf-8 -*- # Copyright 2010-2013 Kolab Systems AG (http://www.kolabsys.com) # # Jeroen van Meeuwen (Kolab Systems) # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # You should have received a copy of the GNU General Public License # along with this program. If not, see . # from __future__ import print_function import logging import os import sys from optparse import OptionParser try: from ConfigParser import SafeConfigParser except ImportError: from configparser import SafeConfigParser import pykolab from pykolab.conf.defaults import Defaults from pykolab.constants import * from pykolab.translate import _ log = pykolab.getLogger('pykolab.conf') class Conf(object): def __init__(self): """ self.cli_args == Arguments passed on the CLI self.cli_keywords == Parser results (again, CLI) self.cli_parser == The actual Parser (from OptionParser) self.plugins == Our Kolab Plugins """ self.cli_parser = None self.cli_args = None self.cli_keywords = None self.entitlement = None self.changelog = {} try: from pykolab.conf.entitlement import Entitlement entitlements = True except Exception: entitlements = False pass if entitlements: self.entitlement = Entitlement().get() self.plugins = None # The location where our configuration parser is going to end up self.cfg_parser = None # Create the options self.create_options() def finalize_conf(self, fatal=True): self.create_options_from_plugins() self.parse_options(fatal=fatal) # The defaults can some from; # - a file we ship with the packages # - a customly supplied file (by customer) # - a file we write out # - this python class # # Look, we want defaults self.defaults = Defaults(self.plugins) # But, they should be available in our class as well for option in self.defaults.__dict__: log.debug( _("Setting %s to %r (from defaults)") % ( option, self.defaults.__dict__[option] ), level=8 ) setattr(self, option, self.defaults.__dict__[option]) # This is where we check our parser for the defaults being set there. self.set_defaults_from_cli_options() self.options_set_from_config() # Also set the cli options if hasattr(self, 'cli_keywords') and self.cli_keywords is not None: for option in self.cli_keywords.__dict__: retval = False if hasattr(self, "check_setting_%s" % (option)): retval = eval( "self.check_setting_%s(%r)" % ( option, self.cli_keywords.__dict__[option] ) ) # The warning, error or confirmation dialog is in the check_setting_%s() # function if not retval: continue log.debug( _("Setting %s to %r (from CLI, verified)") % ( option, self.cli_keywords.__dict__[option] ), level=8 ) setattr(self, option, self.cli_keywords.__dict__[option]) else: log.debug( _("Setting %s to %r (from CLI, not checked)") % ( option, self.cli_keywords.__dict__[option] ), level=8 ) setattr(self, option, self.cli_keywords.__dict__[option]) def load_config(self, config): """ Given a SafeConfigParser instance, loads a configuration file and checks, then sets everything it can find. """ for section in self.defaults.__dict__: if section == 'testing': continue if not config.has_section(section): continue for key in self.defaults.__dict__[section]: retval = False if not config.has_option(section, key): continue if isinstance(self.defaults.__dict__[section][key], int): value = config.getint(section, key) elif isinstance(self.defaults.__dict__[section][key], bool): value = config.getboolean(section, key) elif isinstance(self.defaults.__dict__[section][key], str): value = config.get(section, key) elif isinstance(self.defaults.__dict__[section][key], list): value = eval(config.get(section, key)) elif isinstance(self.defaults.__dict__[section][key], dict): value = eval(config.get(section, key)) if hasattr(self, "check_setting_%s_%s" % (section, key)): - exec("retval = self.check_setting_%s_%s(%r)" % (section, key, value)) + retval = eval("self.check_setting_%s_%s(%r)" % (section, key, value)) if not retval: # We just don't set it, check_setting_%s should have # taken care of the error messages continue if not self.defaults.__dict__[section][key] == value: if key.count('password') >= 1: log.debug( _("Setting %s_%s to '****' (from configuration file)") % ( section, key ), level=8 ) else: log.debug( _("Setting %s_%s to %r (from configuration file)") % ( section, key, value ), level=8 ) setattr(self, "%s_%s" % (section, key), value) def options_set_from_config(self): """ Sets the default configuration options from a configuration file. Configuration file may be customized using the --config CLI option """ log.debug(_("Setting options from configuration file"), level=4) # Check from which configuration file we should get the defaults # Other then default? self.config_file = self.defaults.config_file if hasattr(self, 'cli_keywords') and self.cli_keywords is not None: if not self.cli_keywords.config_file == self.defaults.config_file: self.config_file = self.cli_keywords.config_file config = self.check_config() self.load_config(config) def set_options_from_testing_section(self): """ Go through the options in the [testing] section if it exists. """ config = self.check_config() if not config.has_section('testing'): return for key in config.options('testing'): retval = False if isinstance(self.defaults.__dict__['testing'][key], int): value = config.getint('testing', key) elif isinstance(self.defaults.__dict__['testing'][key], bool): value = config.getboolean('testing', key) elif isinstance(self.defaults.__dict__['testing'][key], str): value = config.get('testing', key) elif isinstance(self.defaults.__dict__['testing'][key], list): value = eval(config.get('testing', key)) elif isinstance(self.defaults.__dict__['testing'][key], dict): value = eval(config.get('testing', key)) if hasattr(self, "check_setting_%s_%s" % ('testing', key)): - exec("retval = self.check_setting_%s_%s(%r)" % ('testing', key, value)) + retval = eval("self.check_setting_%s_%s(%r)" % ('testing', key, value)) if not retval: # We just don't set it, check_setting_%s should have # taken care of the error messages continue setattr(self, "%s_%s" % ('testing', key), value) if key.count('password') >= 1: log.debug( _("Setting %s_%s to '****' (from configuration file)") % ('testing', key), level=8 ) else: log.debug( _("Setting %s_%s to %r (from configuration file)") % ('testing', key, value), level=8 ) def check_config(self, val=None): """ Checks self.config_file or the filename passed using 'val' and returns a SafeConfigParser instance if everything is OK. """ if val is not None: config_file = val else: config_file = self.config_file if not os.access(config_file, os.R_OK): log.error(_("Configuration file %s not readable") % config_file) config = SafeConfigParser() log.debug(_("Reading configuration file %s") % config_file, level=8) try: config.read(config_file) except Exception: log.error(_("Invalid configuration file %s") % config_file) if not config.has_section("kolab"): log.warning( _("No master configuration section [kolab] in configuration file %s") % config_file ) return config def add_cli_parser_option_group(self, name): return self.cli_parser.add_option_group(name) def create_options_from_plugins(self): """ Create options from plugins. This function must be called separately from Conf.__init__(), or the configuration store is not yet done initializing when the plugins class and the plugins themselves go look for it. """ import pykolab.plugins self.plugins = pykolab.plugins.KolabPlugins() self.plugins.add_options(self.cli_parser) def create_options(self, load_plugins=True): """ Create the OptionParser for the options passed to us from runtime Command Line Interface. """ # Enterprise Linux 5 does not have an "epilog" parameter to OptionParser try: self.cli_parser = OptionParser(epilog=epilog) except Exception: self.cli_parser = OptionParser() # # Runtime Options # runtime_group = self.cli_parser.add_option_group(_("Runtime Options")) runtime_group.add_option( "-c", "--config", dest="config_file", action="store", default="/etc/kolab/kolab.conf", help=_("Configuration file to use") ) runtime_group.add_option( "-d", "--debug", dest="debuglevel", type='int', default=0, help=_( "Set the debugging verbosity. Maximum is 9, tracing protocols LDAP, SQL and IMAP." ) ) runtime_group.add_option( "-e", "--default", dest="answer_default", action="store_true", default=False, help=_("Use the default answer to all questions.") ) runtime_group.add_option( "-l", dest="loglevel", type='str', default="CRITICAL", help=_("Set the logging level. One of info, warn, error, critical or debug") ) runtime_group.add_option( "--logfile", dest="logfile", action="store", default="/var/log/kolab/pykolab.log", help=_("Log file to use") ) runtime_group.add_option( "-q", "--quiet", dest="quiet", action="store_true", default=False, help=_("Be quiet.") ) runtime_group.add_option( "-y", "--yes", dest="answer_yes", action="store_true", default=False, help=_("Answer yes to all questions.") ) def parse_options(self, fatal=True): """ Parse options passed to our call. """ if fatal: (self.cli_keywords, self.cli_args) = self.cli_parser.parse_args() def run(self): """ Run Forest, RUN! """ if self.cli_args: if len(self.cli_args) >= 1: if hasattr(self, "command_%s" % self.cli_args[0].replace('-', '_')): exec( "self.command_%s(%r)" % ( self.cli_args[0].replace('-', '_'), self.cli_args[1:] ) ) else: print(_("No command supplied"), file=sys.stderr) def command_dump(self, *args, **kw): """ Dumps applicable, valid configuration that is not defaults. """ if not self.cfg_parser: self.read_config() if not self.cfg_parser.has_section('kolab'): print("No section found for kolab", file=sys.stderr) sys.exit(1) # Get the sections, and then walk through the sections in a # sensible way. items = self.cfg_parser.options('kolab') items.sort() for item in items: mode = self.cfg_parser.get('kolab', item) print("%s = %s" % (item, mode)) if not self.cfg_parser.has_section(mode): print("WARNING: No configuration section %s for item %s" % (mode, item)) continue keys = self.cfg_parser.options(mode) keys.sort() if self.cfg_parser.has_option(mode, 'leave_this_one_to_me'): print("Ignoring section %s" % (mode)) continue for key in keys: print("%s_%s = %s" % (mode, key, self.cfg_parser.get(mode, key))) def read_config(self, value=None): """ Reads the configuration file, sets a self.cfg_parser. """ if not value: value = self.defaults.config_file if hasattr(self, 'cli_keywords') and self.cli_keywords is not None: value = self.cli_keywords.config_file self.cfg_parser = SafeConfigParser() self.cfg_parser.read(value) if hasattr(self, 'cli_keywords') and hasattr(self.cli_keywords, 'config_file'): self.cli_keywords.config_file = value self.defaults.config_file = value self.config_file = value def command_get(self, *args, **kw): """ Get a configuration option. Pass me a section and key please. """ - exec("args = %r" % args) + args = eval("%r" % args) print("%s/%s: %r" % (args[0], args[1], self.get(args[0], args[1]))) # if len(args) == 3: # # Return non-zero if no match # # Return zero if match # # Improvised "check" function def command_set(self, *args, **kw): """ Set a configuration option. Pass me a section, key and value please. Note that the section should already exist. TODO: Add a strict parameter TODO: Add key value checking """ if not self.cfg_parser: self.read_config() if not len(args) == 3: log.error(_("Insufficient options. Need section, key and value -in that order.")) if not self.cfg_parser.has_section(args[0]): log.error(_("No section '%s' exists.") % (args[0])) if '%' in args[2]: value = args[2].replace('%', '%%') else: value = args[2] self.cfg_parser.set(args[0], args[1], value) if hasattr(self, 'cli_keywords') and hasattr(self.cli_keywords, 'config_file'): fp = open(self.cli_keywords.config_file, "w+") self.cfg_parser.write(fp) fp.close() else: fp = open(self.config_file, "w+") self.cfg_parser.write(fp) fp.close() def create_logger(self): """ Create a logger instance using cli_options.debuglevel """ global log if self.cli_keywords.debuglevel is not None: loglevel = logging.DEBUG else: loglevel = logging.INFO self.cli_keywords.debuglevel = 0 self.debuglevel = self.cli_keywords.debuglevel # Initialize logger log = pykolab.logger.Logger( loglevel=loglevel, debuglevel=self.cli_keywords.debuglevel, logfile=self.cli_keywords.logfile ) def set_defaults_from_cli_options(self): for long_opt in self.cli_parser.__dict__['_long_opt']: if long_opt == "--help": continue setattr( self.defaults, self.cli_parser._long_opt[long_opt].dest, self.cli_parser._long_opt[long_opt].default ) # But, they should be available in our class as well for option in self.cli_parser.defaults: log.debug( _("Setting %s to %r (from the default values for CLI options)") % ( option, self.cli_parser.defaults[option] ), level=8 ) setattr(self, option, self.cli_parser.defaults[option]) def has_section(self, section): if not self.cfg_parser: self.read_config() return self.cfg_parser.has_section(section) def has_option(self, section, option): if not self.cfg_parser: self.read_config() return self.cfg_parser.has_option(section, option) def get_list(self, section, key, default=None): """ Gets a comma and/or space separated list from the configuration file and returns a list. """ values = [] untrimmed_values = [] setting = self.get_raw(section, key) if setting is None: return default if default else [] raw_values = setting.split(',') if raw_values is None: return default if default else [] for raw_value in raw_values: untrimmed_values.extend(raw_value.split(' ')) for value in untrimmed_values: if not value.strip() == "": values.append(value.strip().lower()) return values def get_raw(self, section, key, default=None): if not self.cfg_parser: self.read_config() if self.cfg_parser.has_option(section, key): return self.cfg_parser.get(section, key, raw=1) return default def get(self, section, key, default=None, quiet=False): """ Get a configuration option from our store, the configuration file, or an external source if we have some sort of function for it. TODO: Include getting the value from plugins through a hook. """ retval = False if not self.cfg_parser: self.read_config() # log.debug(_("Obtaining value for section %r, key %r") % (section, key), level=8) if self.cfg_parser.has_option(section, key): try: return self.cfg_parser.get(section, key) except Exception: self.read_config() return self.cfg_parser.get(section, key) if hasattr(self, "get_%s_%s" % (section, key)): try: - exec("retval = self.get_%s_%s(quiet)" % (section, key)) + retval = eval("self.get_%s_%s(quiet)" % (section, key)) except Exception: log.error( _("Could not execute configuration function: %s") % ( "get_%s_%s(quiet=%r)" % ( section, key, quiet ) ) ) return default return retval if quiet: return "" else: log.debug( _("Option %s/%s does not exist in config file %s, pulling from defaults") % ( section, key, self.config_file ), level=8 ) if hasattr(self.defaults, "%s_%s" % (section, key)): return getattr(self.defaults, "%s_%s" % (section, key)) elif hasattr(self.defaults, "%s" % (section)): if key in getattr(self.defaults, "%s" % (section)): _dict = getattr(self.defaults, "%s" % (section)) return _dict[key] else: log.warning(_("Option does not exist in defaults.")) return default else: log.warning(_("Option does not exist in defaults.")) return default def check_setting_config_file(self, value): if os.path.isfile(value): if os.access(value, os.R_OK): self.read_config(value=value) self.config_file = value return True else: log.error(_("Configuration file %s not readable.") % (value)) return False else: log.error(_("Configuration file %s does not exist.") % (value)) return False def check_setting_debuglevel(self, value): if value < 0: log.info( _( "WARNING: A negative debug level value does not " + "make this program be any more silent." ) ) elif value == 0: return True elif value <= 9: return True else: log.warning(_("This program has 9 levels of verbosity. Using the maximum of 9.")) return True def check_setting_saslauth_mode(self, value): if value: # TODO: I suppose this is platform specific if os.path.isfile("/var/run/saslauthd/mux"): if os.path.isfile("/var/run/saslauthd/saslauthd.pid"): log.error(_("Cannot start SASL authentication daemon")) return False else: try: os.remove("/var/run/saslauthd/mux") except IOError: log.error(_("Cannot start SASL authentication daemon")) return False elif os.path.isfile("/var/run/sasl2/mux"): if os.path.isfile("/var/run/sasl2/saslauthd.pid"): log.error(_("Cannot start SASL authentication daemon")) return False else: try: os.remove("/var/run/sasl2/mux") except IOError: log.error(_("Cannot start SASL authentication daemon")) return False return True def check_setting_use_imap(self, value): if value: try: import imaplib self.use_imap = value return True except ImportError: log.error(_("No imaplib library found.")) return False def check_setting_use_lmtp(self, value): if value: try: from smtplib import LMTP self.use_lmtp = value return True except ImportError: log.error(_("No LMTP class found in the smtplib library.")) return False def check_setting_use_mail(self, value): if value: try: from smtplib import SMTP self.use_mail = value return True except ImportError: log.error(_("No SMTP class found in the smtplib library.")) return False def check_setting_test_suites(self, value): # Attempt to load the suite, # Get the suite's options, # Set them here. if not hasattr(self, 'test_suites'): self.test_suites = [] if "zpush" in value: selectively = False for item in ['calendar', 'contacts', 'mail']: if self.cli_keywords.__dict__[item]: log.debug( _("Found you specified a specific set of items to test: %s") % (item), level=8 ) selectively = item if not selectively: self.calendar = True self.contacts = True self.mail = True else: log.debug(_("Selectively selecting: %s") % (selectively), level=8) setattr(self, selectively, True) self.test_suites.append('zpush') def check_setting_calendar(self, value): if self.cli_parser._long_opt['--calendar'].default == value: return False else: return True def check_setting_contacts(self, value): if self.cli_parser._long_opt['--contacts'].default == value: return False else: return True def check_setting_mail(self, value): if self.cli_parser._long_opt['--mail'].default == value: return False else: return True diff --git a/pykolab/imap/__init__.py b/pykolab/imap/__init__.py index a773bfc..eb9acea 100644 --- a/pykolab/imap/__init__.py +++ b/pykolab/imap/__init__.py @@ -1,1273 +1,1273 @@ # -*- coding: utf-8 -*- # Copyright 2010-2013 Kolab Systems AG (http://www.kolabsys.com) # # Jeroen van Meeuwen (Kolab Systems) # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # You should have received a copy of the GNU General Public License # along with this program. If not, see . # from six import string_types import logging import re import time import socket import sys try: from urlparse import urlparse except ImportError: from urllib.parse import urlparse import pykolab from pykolab import utils from pykolab.translate import _ log = pykolab.getLogger('pykolab.imap') conf = pykolab.getConf() class IMAP(object): def __init__(self): # Pool of named IMAP connections, by hostname self._imap = {} # Place holder for the current IMAP connection self.imap = None def cleanup_acls(self, aci_subject): log.info( _("Cleaning up ACL entries for %s across all folders") % ( aci_subject ) ) lm_suffix = "" if len(aci_subject.split('@')) > 1: lm_suffix = "@%s" % (aci_subject.split('@')[1]) shared_folders = self.imap.lm("shared/*%s" % (lm_suffix)) user_folders = self.imap.lm("user/*%s" % (lm_suffix)) # For all folders (shared and user), ... folders = user_folders + shared_folders log.debug(_("Iterating over %d folders") % (len(folders)), level=5) # ... loop through them and ... for folder in folders: try: # ... list the ACL entries acls = self.imap.lam(folder) # For each ACL entry, see if we think it is a current, valid # entry for acl_entry in acls: # If the key 'acl_entry' does not exist in the dictionary # of valid ACL entries, this ACL entry has got to go. if acl_entry == aci_subject: # Set the ACL to '' (effectively deleting the ACL # entry) log.debug( _( "Removing acl %r for subject %r from folder %r" ) % ( acls[acl_entry], acl_entry, folder ), level=8 ) self.set_acl(folder, acl_entry, '') except Exception as errmsg: log.error( _("Failed to read/set ACL on folder %s: %r") % ( folder, errmsg ) ) def connect(self, uri=None, server=None, domain=None, login=True): """ Connect to the appropriate IMAP backend. Supply a domain (name space) configured in the configuration file as a section, with a setting 'imap_uri' to connect to a domain specific IMAP server, or specify an URI to connect to that particular IMAP server (in that order). Routines sitting behind this will take into account Cyrus IMAP Murder capabilities, brokering actions to take place against the correct server (such as a 'xfer' which needs to happen against the source backend). """ # TODO: We are currently compatible with one IMAP backend technology per # deployment. backend = conf.get('kolab', 'imap_backend') if domain is not None: self.domain = domain if conf.has_section(domain) and conf.has_option(domain, 'imap_backend'): backend = conf.get(domain, 'imap_backend') if uri is None: if conf.has_section(domain) and conf.has_option(domain, 'imap_uri'): uri = conf.get(domain, 'imap_uri') else: self.domain = None scheme = None hostname = None port = None if uri is None: uri = conf.get(backend, 'uri') result = urlparse(uri) if hasattr(result, 'netloc'): scheme = result.scheme if len(result.netloc.split(':')) > 1: hostname = result.netloc.split(':')[0] port = result.netloc.split(':')[1] else: hostname = result.netloc elif hasattr(result, 'hostname'): hostname = result.hostname else: scheme = uri.split(':')[0] (hostname, port) = uri.split('/')[2].split(':') if server is not None: hostname = server if scheme is None or scheme == "": scheme = 'imaps' if port is None: if scheme == "imaps": port = 993 elif scheme == "imap": port = 143 else: port = 993 uri = '%s://%s:%s' % (scheme, hostname, port) # Get the credentials admin_login = conf.get(backend, 'admin_login') admin_password = conf.get(backend, 'admin_password') if admin_password is None or admin_password == '': log.error(_("No administrator password is available.")) if hostname not in self._imap: if backend == 'cyrus-imap': from . import cyrus self._imap[hostname] = cyrus.Cyrus(uri) # Actually connect if login: log.debug(_("Logging on to Cyrus IMAP server %s") % (hostname), level=8) self._imap[hostname].login(admin_login, admin_password) self._imap[hostname].logged_in = True elif backend == 'dovecot': from . import dovecot self._imap[hostname] = dovecot.Dovecot(uri) # Actually connect if login: log.debug(_("Logging on to Dovecot IMAP server %s") % (hostname), level=8) self._imap[hostname].login(admin_login, admin_password) self._imap[hostname].logged_in = True else: import imaplib self._imap[hostname] = imaplib.IMAP4(hostname, port) # Actually connect if login: log.debug(_("Logging on to generic IMAP server %s") % (hostname), level=8) self._imap[hostname].login(admin_login, admin_password) self._imap[hostname].logged_in = True else: if not login: self.disconnect(hostname) self.connect(uri=uri, login=False) elif login and not hasattr(self._imap[hostname], 'logged_in'): self.disconnect(hostname) self.connect(uri=uri) else: try: if hasattr(self._imap[hostname], 'm'): self._imap[hostname].m.noop() elif hasattr(self._imap[hostname], 'noop') \ and callable(self._imap[hostname].noop): self._imap[hostname].noop() log.debug( _("Reusing existing IMAP server connection to %s") % (hostname), level=8 ) except Exception: log.debug(_("Reconnecting to IMAP server %s") % (hostname), level=8) self.disconnect(hostname) self.connect() # Set the newly created technology specific IMAP library as the current # IMAP connection to be used. self.imap = self._imap[hostname] if hasattr(self.imap, 'm') and hasattr(self.imap.m, 'sock'): self._set_socket_keepalive(self.imap.m.sock) elif hasattr(self.imap, 'sock'): self._set_socket_keepalive(self.imap.sock) def disconnect(self, server=None): if server is None: # No server specified, but make sure self.imap is None anyways if hasattr(self, 'imap'): del self.imap # Empty out self._imap as well for key in list(self._imap): del self._imap[key] else: if server in self._imap: del self._imap[server] else: log.warning( _("Called imap.disconnect() on a server that we had no connection to.") ) def create_folder(self, folder_path, server=None, partition=None): folder_path = self.folder_utf7(folder_path) if server is not None: self.connect(server=server) try: self._imap[server].cm(folder_path, partition=partition) return True except Exception as excpt: log.error( _("Could not create folder %r on server %r: %r") % (folder_path, server, excpt) ) else: try: self.imap.cm(folder_path, partition=partition) return True except Exception as excpt: log.error(_("Could not create folder %r: %r") % (folder_path, excpt)) return False def __getattr__(self, name): if hasattr(self.imap, name): return getattr(self.imap, name) if hasattr(self.imap, 'm'): if hasattr(self.imap.m, name): return getattr(self.imap.m, name) raise AttributeError(_("%r has no attribute %s") % (self, name)) def append(self, folder, message): return self.imap.m.append(self.folder_utf7(folder), None, None, message) def folder_utf7(self, folder): from pykolab import imap_utf7 return imap_utf7.encode(folder) def folder_utf8(self, folder): from pykolab import imap_utf7 return imap_utf7.decode(folder) def folder_quote(self, folder): return u'"' + str(folder).strip('"') + '"' def get_metadata(self, folder): """ Obtain all metadata entries on a folder """ metadata = {} _metadata = self.imap.getannotation(self.folder_utf7(folder), '*') for (k, v) in _metadata.items(): metadata[self.folder_utf8(k)] = v return metadata def get_separator(self): if not hasattr(self, 'imap') or self.imap is None: self.connect() if hasattr(self.imap, 'separator'): return self.imap.separator elif hasattr(self.imap, 'm') and hasattr(self.imap.m, 'separator'): return self.imap.m.separator else: return '/' def imap_murder(self): if hasattr(self.imap, 'murder') and self.imap.murder: return True else: return False def namespaces(self): """ Obtain the namespaces. Returns a tuple of: (str(personal) [, str(other users) [, list(shared)]]) """ _personal = None _other_users = None _shared = None (_response, _namespaces) = self.imap.m.namespace() if len(_namespaces) == 1: _namespaces = _namespaces[0] _namespaces = re.split(r"\)\)\s\(\(", _namespaces) if len(_namespaces) >= 3: _shared = [] _shared.append(' '.join(_namespaces[2].replace('((', '').replace('))', '').split()[:-1]).replace('"', '')) if len(_namespaces) >= 2: _other_users = ' '.join(_namespaces[1].replace('((', '').replace('))', '').split()[:-1]).replace('"', '') if len(_namespaces) >= 1: _personal = _namespaces[0].replace('((', '').replace('))', '').split()[0].replace('"', '') return (_personal, _other_users, _shared) def set_acl(self, folder, identifier, acl): """ Set an ACL entry on a folder. """ _acl = '' short_rights = { 'all': 'lrsedntxakcpiw', 'append': 'wip', 'full': 'lrswipkxtecdn', 'read': 'lrs', 'read-only': 'lrs', 'read-write': 'lrswitedn', 'post': 'p', 'semi-full': 'lrswit', 'write': 'lrswite', } if acl in short_rights: acl = short_rights[acl] else: for char in acl: if char in "-+": continue if not char in short_rights['all']: log.error(_("Invalid access identifier %r for subject %r") % (acl, identifier)) return False # Special treatment for '-' and '+' characters if '+' in acl or '-' in acl: acl_map = { 'set': '', 'subtract': '', 'add': '' } mode = 'set' for char in acl: if char == '-': mode = 'subtract' continue if char == '+': mode = 'add' continue acl_map[mode] += char current_acls = self.imap.lam(self.folder_utf7(folder)) for current_acl in current_acls: if current_acl == identifier: _acl = current_acls[current_acl] break _acl = _acl + acl_map['set'] + acl_map['add'] _acl = [x for x in _acl.split() if x not in acl_map['subtract'].split()] acl = ''.join(list(set(_acl))) try: self.imap.sam(self.folder_utf7(folder), identifier, acl) except Exception as errmsg: log.error( _("Could not set ACL for %s on folder %s: %r") % ( identifier, folder, errmsg ) ) def del_acl(self, folder, identifier): try: self.imap.dam(self.folder_utf7(folder), identifier) except Exception as errmsg: log.error( _("Could not delete ACL for %s on folder %s: %r") % ( identifier, folder, errmsg ) ) def set_metadata(self, folder, metadata_path, metadata_value, shared=True): """ Set a metadata entry on a folder """ if metadata_path.startswith('/shared/'): shared = True metadata_path = metadata_path.replace('/shared/', '/') elif metadata_path.startswith('/private/'): shared = False metadata_path = metadata_path.replace('/private/', '/') self.imap._setannotation(self.folder_utf7(folder), metadata_path, metadata_value, shared) def shared_folder_create(self, folder_path, server=None): """ Create a shared folder. """ folder_name = "shared%s%s" % (self.get_separator(), folder_path) # Correct folder_path being supplied with "shared/shared/" for example if folder_name.startswith("shared%s" % (self.get_separator()) * 2): folder_name = folder_name[7:] log.info(_("Creating new shared folder %s") % (folder_name)) self.create_folder(folder_name, server) def shared_folder_exists(self, folder_path): """ Check if a shared mailbox exists. """ folder_name = 'shared%s%s' % (self.get_separator(), folder_path) # Correct folder_path being supplied with "shared/shared/" for example if folder_name.startswith("shared%s" % (self.get_separator()) * 2): folder_name = folder_name[7:] return self.has_folder(folder_name) def shared_folder_rename(self, old, new): if not self.has_folder(old): log.error("Shared Folder rename error: Folder %s does not exist" % (old)) return if self.has_folder(new): log.error("Shared Folder rename error: Folder %s already exists" % (new)) return self.imap._rename(old, new) def shared_folder_set_type(self, folder_path, folder_type): folder_name = 'shared%s%s' % (self.get_separator(), folder_path) # Correct folder_path being supplied with "shared/shared/" for example if folder_name.startswith("shared%s" % (self.get_separator()) * 2): folder_name = folder_name[7:] self.set_metadata(folder_name, '/shared/vendor/kolab/folder-type', folder_type) def shared_mailbox_create(self, mailbox_base_name, server=None): """ Create a shared folder. """ self.connect() folder_name = "shared%s%s" % (self.get_separator(), mailbox_base_name) # Correct folder_path being supplied with "shared/shared/" for example if folder_name.startswith("shared%s" % (self.get_separator()) * 2): folder_name = folder_name[7:] log.info(_("Creating new shared folder %s") %(mailbox_base_name)) self.create_folder(folder_name, server) def shared_mailbox_exists(self, mailbox_base_name): """ Check if a shared mailbox exists. """ self.connect() folder_name = "shared%s%s" % (self.get_separator(), mailbox_base_name) # Correct folder_path being supplied with "shared/shared/" for example if folder_name.startswith("shared%s" % (self.get_separator()) * 2): folder_name = folder_name[7:] return self.has_folder(folder_name) def user_mailbox_create(self, mailbox_base_name, server=None): """ Create a user mailbox. Returns the full path to the new mailbox folder. """ # TODO: Whether or not to lowercase the mailbox name is really up to the # IMAP server setting username_tolower (normalize_uid, lmtp_downcase_rcpt). self.connect() if not mailbox_base_name == mailbox_base_name.lower(): log.warning(_("Downcasing mailbox name %r") % (mailbox_base_name)) mailbox_base_name = mailbox_base_name.lower() folder_name = "user%s%s" % (self.get_separator(), mailbox_base_name) log.info(_("Creating new mailbox for user %s") %(mailbox_base_name)) success = self._create_folder_waiting(folder_name, server) if not success: log.error(_("Could not create the mailbox for user %s, aborting." % (mailbox_base_name))) return False _additional_folders = None if not hasattr(self, 'domain'): self.domain = None if self.domain is None and len(mailbox_base_name.split('@')) > 1: self.domain = mailbox_base_name.split('@')[1] if not self.domain is None: if conf.has_option(self.domain, "autocreate_folders"): _additional_folders = conf.get_raw( self.domain, "autocreate_folders" ) else: from pykolab.auth import Auth auth = Auth() auth.connect() domains = auth.list_domains(self.domain) auth.disconnect() if len(domains) > 0: if self.domain in domains: primary = domains[self.domain] if conf.has_option(primary, "autocreate_folders"): _additional_folders = conf.get_raw( primary, "autocreate_folders" ) if _additional_folders is None: if conf.has_option('kolab', "autocreate_folders"): _additional_folders = conf.get_raw( 'kolab', "autocreate_folders" ) additional_folders = conf.plugins.exec_hook( "create_user_folders", kw={ 'folder': folder_name, 'additional_folders': _additional_folders } ) if additional_folders is not None: self.user_mailbox_create_additional_folders( mailbox_base_name, additional_folders ) if not self.domain is None: if conf.has_option(self.domain, "sieve_mgmt"): sieve_mgmt_enabled = conf.get(self.domain, 'sieve_mgmt') if utils.true_or_false(sieve_mgmt_enabled): conf.plugins.exec_hook( 'sieve_mgmt_refresh', kw={ 'user': mailbox_base_name } ) return folder_name def user_mailbox_create_additional_folders(self, user, additional_folders): log.debug( _("Creating additional folders for user %s") % (user), level=8 ) backend = conf.get('kolab', 'imap_backend') admin_login = conf.get(backend, 'admin_login') admin_password = conf.get(backend, 'admin_password') folder = 'user%s%s' % (self.get_separator(), user) if self.imap_murder(): server = self.user_mailbox_server(folder) else: server = None success = False last_log = time.time() while not success: try: self.disconnect() self.connect(login=False, server=server) self.login_plain(admin_login, admin_password, user) (personal, other, shared) = self.namespaces() success = True except Exception as errmsg: if time.time() - last_log > 5 and self.imap_murder(): log.debug(_("Waiting for the Cyrus murder to settle... %r") % (errmsg)) last_log = time.time() if conf.debuglevel > 8: import traceback traceback.print_exc() time.sleep(0.5) for additional_folder in additional_folders: _add_folder = {} folder_name = additional_folder if not folder_name.startswith(personal): log.error(_("Correcting additional folder name from %r to %r") % (folder_name, "%s%s" % (personal, folder_name))) folder_name = "%s%s" % (personal, folder_name) success = self._create_folder_waiting(folder_name) if not success: log.warning(_("Failed to create folder: %s") % (folder_name)) continue if "annotations" in additional_folders[additional_folder]: for annotation in additional_folders[additional_folder]["annotations"]: self.set_metadata( folder_name, "%s" % (annotation), "%s" % (additional_folders[additional_folder]["annotations"][annotation]) ) if "acls" in additional_folders[additional_folder]: for acl in additional_folders[additional_folder]["acls"]: self.set_acl( folder_name, "%s" % (acl), "%s" % (additional_folders[additional_folder]["acls"][acl]) ) if len(user.split('@')) > 1: localpart = user.split('@')[0] domain = user.split('@')[1] domain_suffix = "@%s" % (domain) else: localpart = user domain = None domain_suffix = "" if domain is not None: if conf.has_section(domain) and conf.has_option(domain, 'imap_backend'): backend = conf.get(domain, 'imap_backend') if conf.has_section(domain) and conf.has_option(domain, 'imap_uri'): uri = conf.get(domain, 'imap_uri') else: uri = None log.debug(_("Subscribing user to the additional folders"), level=8) _tests = [] # Subscribe only to personal folders (personal, other, shared) = self.namespaces() if other is not None: _tests.append(other) if shared is not None: for _shared in shared: _tests.append(_shared) log.debug(_("Using the following tests for folder subscriptions:"), level=8) for _test in _tests: log.debug(_(" %r") % (_test), level=8) for _folder in self.lm(): log.debug(_("Folder %s") % (_folder), level=8) _subscribe = True for _test in _tests: if not _subscribe: continue if _folder.startswith(_test): _subscribe = False if _subscribe: log.debug(_("Subscribing %s to folder %s") % (user, _folder), level=8) try: self.subscribe(_folder) except Exception as errmsg: log.error(_("Subscribing %s to folder %s failed: %r") % (user, _folder, errmsg)) self.logout() self.connect(domain=self.domain) for additional_folder in additional_folders: if additional_folder.startswith(personal) and not personal == '': folder_name = additional_folder.replace(personal, '') else: folder_name = additional_folder folder_name = "user%s%s%s%s%s" % ( self.get_separator(), localpart, self.get_separator(), folder_name, domain_suffix ) if "quota" in additional_folders[additional_folder]: try: self.imap.sq( folder_name, additional_folders[additional_folder]['quota'] ) except Exception as errmsg: log.error(_("Could not set quota on %s") % (additional_folder)) if "partition" in additional_folders[additional_folder]: partition = additional_folders[additional_folder]["partition"] try: self.imap._rename(folder_name, folder_name, partition) except: log.error(_("Could not rename %s to reside on partition %s") % (folder_name, partition)) def _create_folder_waiting(self, folder_name, server=None): """ Create a folder and wait to make sure it exists """ created = False try: max_tries = 10 while not created and max_tries > 0: created = self.create_folder(folder_name, server) if not created: self.disconnect() max_tries -= 1 time.sleep(1) self.connect() # In a Cyrus IMAP Murder topology, wait for the murder to have settled if created and self.imap_murder(): success = False last_log = time.time() reconnect_counter = 0 while not success: success = self.has_folder(folder_name) if not success: if time.time() - last_log > 5: reconnect_counter += 1 log.info(_("Waiting for the Cyrus IMAP Murder to settle...")) if reconnect_counter == 6: log.warning(_("Waited for 15 seconds, going to reconnect")) reconnect_counter = 0 self.disconnect() self.connect() last_log = time.time() time.sleep(0.5) except: if conf.debuglevel > 8: import traceback traceback.print_exc() return created def user_mailbox_delete(self, mailbox_base_name): """ Delete a user mailbox. """ self.connect() folder = "user%s%s" %(self.get_separator(), mailbox_base_name) self.delete_mailfolder(folder) self.cleanup_acls(mailbox_base_name) def user_mailbox_exists(self, mailbox_base_name): """ Check if a user mailbox exists. """ self.connect() if not mailbox_base_name == mailbox_base_name.lower(): log.warning(_("Downcasing mailbox name %r") % (mailbox_base_name)) mailbox_base_name = mailbox_base_name.lower() return self.has_folder('user%s%s' %(self.get_separator(), mailbox_base_name)) def user_mailbox_quota(self, mailbox_quota): pass def user_mailbox_rename(self, old_name, new_name, partition=None): self.connect() old_name = "user%s%s" % (self.get_separator(), old_name) new_name = "user%s%s" % (self.get_separator(), new_name) if old_name == new_name and partition is None: return if not self.has_folder(old_name): log.error(_("INBOX folder to rename (%s) does not exist") % (old_name)) if not self.has_folder(new_name) or not partition is None: log.info(_("Renaming INBOX from %s to %s") % (old_name, new_name)) try: self.imap.rename(old_name, new_name, partition) except: log.error(_("Could not rename INBOX folder %s to %s") % (old_name, new_name)) else: log.warning(_("Moving INBOX folder %s won't succeed as target folder %s already exists") % (old_name, new_name)) def user_mailbox_server(self, mailbox): server = self.imap.find_mailfolder_server(mailbox.lower()).lower() log.debug(_("Server for mailbox %r is %r") % (mailbox, server), level=8) return server def has_folder(self, folder): """ Check if the environment has a folder named folder. """ folders = self.imap.lm(self.folder_utf7(folder)) log.debug(_("Looking for folder '%s', we found folders: %r") % (folder, [self.folder_utf8(x) for x in folders]), level=8) # Greater then one, this folder may have subfolders. if len(folders) > 0: return True else: return False def _set_socket_keepalive(self, sock): sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) with open('/proc/sys/net/ipv4/tcp_keepalive_time', 'r') as f: sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, (int)(f.read())) with open('/proc/sys/net/ipv4/tcp_keepalive_intvl', 'r') as f: sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, (int)(f.read())) with open('/proc/sys/net/ipv4/tcp_keepalive_probes', 'r') as f: sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, (int)(f.read())) def _set_kolab_mailfolder_acls(self, acls, folder=None, update=False): # special case, folder has no ACLs assigned and update was requested, # remove all existing ACL entries if update is True and isinstance(acls, list) and len(acls) == 0: acls = self.list_acls(folder) for subject in acls: log.debug( _("Removing ACL rights %s for subject %s on folder " + \ "%s") % (acls[subject], subject, folder), level=8) self.set_acl(folder, subject, '') return if isinstance(acls, string_types): acls = [ acls ] old_acls = None for acl in acls: - exec("acl = %s" % (acl)) + acl = eval("%s" % (acl)) subject = acl[0] rights = acl[1] if len(acl) == 3: epoch = acl[2] else: epoch = (int)(time.time()) + 3600 # update mode, check existing entries if update is True: if old_acls is None: old_acls = self.list_acls(folder) for old_subject in old_acls: old_acls[old_subject] = old_acls[old_subject] if subject in old_acls: old_acls[subject] = None if epoch > (int)(time.time()): log.debug( _("Setting ACL rights %s for subject %s on folder " + \ "%s") % (rights, subject, folder), level=8) self.set_acl( folder, "%s" % (subject), "%s" % (rights) ) else: log.debug( _("Removing ACL rights %s for subject %s on folder " + \ "%s") % (rights, subject, folder), level=8) self.set_acl( folder, "%s" % (subject), "" ) # update mode, unset removed ACL entries if old_acls is not None: for subject in old_acls: if old_acls[subject] is not None: log.debug( _("Removing ACL rights %s for subject %s on folder " + \ "%s") % (old_acls[subject], subject, folder), level=8) self.set_acl(folder, subject, '') pass """ Blah functions """ def move_user_folders(self, users=[], domain=None): for user in users: if type(user) == dict: if 'old_mail' in user: inbox = "user/%s" % (user['mail']) old_inbox = "user/%s" % (user['old_mail']) if self.has_folder(old_inbox): log.debug(_("Found old INBOX folder %s") % (old_inbox), level=8) if not self.has_folder(inbox): log.info(_("Renaming INBOX from %s to %s") % (old_inbox, inbox)) self.imap.rename(old_inbox, inbox) self.inbox_folders.append(inbox) else: log.warning(_("Moving INBOX folder %s won't succeed as target folder %s already exists") % (old_inbox, inbox)) else: log.debug(_("Did not find old folder user/%s to rename") % (user['old_mail']), level=8) else: log.debug(_("Value for user is not a dictionary"), level=8) def set_quota(self, folder, quota): i = 0 while i < 10: try: self.imap._setquota(folder, quota) i = 10 except: self.disconnect() self.connect() i += 1 def set_user_folder_quota(self, users=[], primary_domain=None, secondary_domain=[], folders=[]): """ Sets the quota in IMAP using the authentication and authorization database 'quota' attribute for the users listed in parameter 'users' """ if conf.has_option(primary_domain, 'quota_attribute'): _quota_attr = conf.get(primary_domain, 'quota_attribute') else: auth_mechanism = conf.get('kolab', 'auth_mechanism') _quota_attr = conf.get(auth_mechanism, 'quota_attribute') _inbox_folder_attr = conf.get('cyrus-sasl', 'result_attribute') default_quota = auth.domain_default_quota(primary_domain) if default_quota == "" or default_quota is None: default_quota = 0 if len(users) == 0: users = auth.list_users(primary_domain) for user in users: quota = None if type(user) == dict: if _quota_attr in user: if type(user[_quota_attr]) == list: quota = user[_quota_attr].pop(0) elif type(user[_quota_attr]) == str: quota = user[_quota_attr] else: _quota = auth.get_user_attribute(primary_domain, user, _quota_attr) if _quota is None: quota = 0 else: quota = _quota if _inbox_folder_attr not in user: continue else: if type(user[_inbox_folder_attr]) == list: folder = "user/%s" % user[_inbox_folder_attr].pop(0) elif type(user[_inbox_folder_attr]) == str: folder = "user/%s" % user[_inbox_folder_attr] elif type(user) == str: quota = auth.get_user_attribute(user, _quota_attr) folder = "user/%s" % (user) folder = folder.lower() try: (used, current_quota) = self.imap.lq(folder) except: # TODO: Go in fact correct the quota. log.warning(_("Cannot get current IMAP quota for folder %s") % (folder)) used = 0 current_quota = 0 new_quota = conf.plugins.exec_hook("set_user_folder_quota", kw={ 'used': used, 'current_quota': current_quota, 'new_quota': (int)(quota), 'default_quota': (int)(default_quota), 'user': user } ) log.debug(_("Quota for %s currently is %s") % (folder, current_quota), level=7) if new_quota is None: continue if not int(new_quota) == int(quota): log.info(_("Adjusting authentication database quota for folder %s to %d") % (folder, int(new_quota))) quota = int(new_quota) auth.set_user_attribute(primary_domain, user, _quota_attr, new_quota) if not int(current_quota) == int(quota): log.info(_("Correcting quota for %s to %s (currently %s)") % (folder, quota, current_quota)) self.imap._setquota(folder, quota) def set_user_mailhost(self, users=[], primary_domain=None, secondary_domain=[], folders=[]): if conf.has_option(primary_domain, 'mailserver_attribute'): _mailserver_attr = conf.get(primary_domain, 'mailserver_attribute') else: auth_mechanism = conf.get('kolab', 'auth_mechanism') _mailserver_attr = conf.get(auth_mechanism, 'mailserver_attribute') _inbox_folder_attr = conf.get('cyrus-sasl', 'result_attribute') if len(users) == 0: users = auth.list_users(primary_domain) for user in users: mailhost = None if type(user) == dict: if _mailserver_attr in user: if type(user[_mailserver_attr]) == list: _mailserver = user[_mailserver_attr].pop(0) elif type(user[_mailserver_attr]) == str: _mailserver = user[_mailserver_attr] else: _mailserver = auth.get_user_attribute(primary_domain, user, _mailserver_attr) if _inbox_folder_attr not in user: continue else: if type(user[_inbox_folder_attr]) == list: folder = "user/%s" % user[_inbox_folder_attr].pop(0) elif type(user[_inbox_folder_attr]) == str: folder = "user/%s" % user[_inbox_folder_attr] elif type(user) == str: _mailserver = auth.get_user_attribute(user, _mailserver_attr) folder = "user/%s" % (user) folder = folder.lower() _current_mailserver = self.imap.find_mailfolder_server(folder) if _mailserver is not None: # TODO: if not _current_mailserver == _mailserver: self.imap._xfer(folder, _current_mailserver, _mailserver) else: auth.set_user_attribute(primary_domain, user, _mailserver_attr, _current_mailserver) def parse_mailfolder(self, mailfolder): return self.imap.parse_mailfolder(mailfolder) def expunge_user_folders(self, inbox_folders=None): """ Delete folders that have no equivalent user qualifier in the list of users passed to this function, ... TODO: Explain the domain parameter, and actually get it to work properly. This also relates to threading for multi-domain deployments. Parameters: users A list of users. Can be a list of user qualifiers, e.g. [ 'user1', 'user2' ] or a list of user attribute dictionaries, e.g. [ { 'user1': { 'attr': 'value' } } ] primary_domain, secondary_domains """ if inbox_folders is None: inbox_folders = [] folders = self.list_user_folders() for folder in folders: log.debug(_("Checking folder: %s") % (folder), level=1) try: if inbox_folders.index(folder) > -1: continue else: log.info(_("Folder has no corresponding user (1): %s") % (folder)) self.delete_mailfolder("user/%s" % (folder)) except: log.info(_("Folder has no corresponding user (2): %s") % (folder)) try: self.delete_mailfolder("user/%s" % (folder)) except: pass def delete_mailfolder(self, mailfolder_path): """ Deletes a mail folder described by mailfolder_path. """ mbox_parts = self.parse_mailfolder(mailfolder_path) if mbox_parts is None: # We got user identifier only log.error(_("Please don't give us just a user identifier")) return log.info(_("Deleting folder %s") % (mailfolder_path)) self.imap.dm(self.folder_utf7(mailfolder_path)) def get_quota(self, mailfolder_path): try: return self.lq(self.folder_utf7(mailfolder_path)) except: return def get_quota_root(self, mailfolder_path): return self.lqr(self.folder_utf7(mailfolder_path)) def list_acls(self, folder): """ List the ACL entries on a folder """ return self.imap.lam(self.folder_utf7(folder)) def list_folders(self, pattern): return [self.folder_utf8(x) for x in self.lm(self.folder_utf7(pattern))] def list_user_folders(self, primary_domain=None, secondary_domains=[]): """ List the INBOX folders in the IMAP backend. Returns a list of unique base folder names. """ _folders = self.imap.lm("user/%") # TODO: Replace the .* below with a regex representing acceptable DNS # domain names. domain_re = ".*\.?%s$" acceptable_domain_name_res = [] if primary_domain is not None: for domain in [ primary_domain ] + secondary_domains: acceptable_domain_name_res.append(domain_re % (domain)) folders = [] for folder in _folders: folder_name = None if len(folder.split('@')) > 1: # TODO: acceptable domain name spaces #acceptable = False #for domain_name_re in acceptable_domain_name_res: #prog = re.compile(domain_name_re) #if prog.match(folder.split('@')[1]): #print "Acceptable indeed" #acceptable = True #if not acceptable: #print "%s is not acceptable against %s yet using %s" % (folder.split('@')[1], folder, domain_name_re) #if acceptable: #folder_name = "%s@%s" % (folder.split(self.separator)[1].split('@')[0], folder.split('@')[1]) folder_name = "%s@%s" % (folder.split(self.get_separator())[1].split('@')[0], folder.split('@')[1]) else: folder_name = "%s" % (folder.split(self.get_separator())[1]) if folder_name is not None: if not folder_name in folders: folders.append(folder_name) return folders def lm(self, *args, **kw): return self.imap.lm(*args, **kw) def lq(self, *args, **kw): return self.imap.lq(*args, **kw) def lqr(self, *args, **kw): try: return self.imap.lqr(*args, **kw) except: return (None, None, None) def undelete_mailfolder(self, *args, **kw): self.imap.undelete_mailfolder(*args, **kw) diff --git a/pykolab/plugins/__init__.py b/pykolab/plugins/__init__.py index 96e0044..ab110fb 100644 --- a/pykolab/plugins/__init__.py +++ b/pykolab/plugins/__init__.py @@ -1,272 +1,272 @@ # -*- coding: utf-8 -*- # Copyright 2010-2013 Kolab Systems AG (http://www.kolabsys.com) # # Jeroen van Meeuwen (Kolab Systems) # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # You should have received a copy of the GNU General Public License # along with this program. If not, see . # import logging import os import pdb import sys import traceback import pykolab from pykolab.translate import _ log = pykolab.getLogger('pykolab.plugins') conf = pykolab.getConf() class KolabPlugins(object): """ Detects, loads and interfaces with plugins for different Kolab components. """ def __init__(self): """ Searches the plugin directory for plugins, and loads them into a list. """ self.plugins = {} for plugin_path in [ os.path.dirname(__file__), '/usr/share/pykolab/plugins', './pykolab/plugins' ]: if os.path.isdir(plugin_path): for plugin in os.listdir(plugin_path): if os.path.isdir('%s/%s/' % (plugin_path, plugin, )): self.plugins[plugin] = False self.check_plugins() def check_plugins(self): """ Checks all plugins in self.plugins and sets the values to True (loadable) or False -- not enabled, not installed or not loadable. """ for plugin in self.plugins: try: exec("from pykolab.plugins import %s" % (plugin)) self.plugins[plugin] = True self.load_plugins(plugins=[plugin]) except ImportError as e: log.error(_("ImportError for plugin %s: %s") % (plugin, e)) traceback.print_exc() self.plugins[plugin] = False except RuntimeError as e: log.error( _("RuntimeError for plugin %s: %s") % (plugin, e)) traceback.print_exc() self.plugins[plugin] = False except Exception as e: log.error(_("Plugin %s failed to load (%s: %s)") % (plugin, e.__class__, e)) traceback.print_exc() except: traceback.print_exc() def load_plugins(self, plugins=[]): """ Loads plugins specified by a list of plugins or loads them all """ if len(plugins) < 1: plugins = self.plugins.keys() for plugin in plugins: if self.plugins[plugin]: try: exec("self.%s = %s.Kolab%s()" % (plugin, plugin, plugin.capitalize())) except: # TODO: A little better verbosity please! traceback.print_exc() def set_defaults(self, defaults, plugins=[]): """ Test for a function set_defaults() in all available and loaded plugins and execute plugin.set_defaults() """ if len(plugins) < 1: plugins = self.plugins.keys() for plugin in plugins: if not self.plugins[plugin]: continue if not hasattr(self, plugin): continue if hasattr(getattr(self, plugin), "set_defaults"): try: getattr(self, plugin).set_defaults(defaults) except TypeError as e: log.error(_("Cannot set defaults for plugin %s: %s") % (plugin, e)) except RuntimeError as e: log.error(_("Cannot set defaults for plugin %s: %s") % (plugin, e)) except: log.error(_("Cannot set defaults for plugin %s: Unknown Error") % (plugin)) else: log.debug(_("Not setting defaults for plugin %s: No function 'set_defaults()'") % plugin, level=5) def set_runtime(self, runtime, plugins=[]): """ Set runtime variables from plugins, like 'i_did_all_this' """ if len(plugins) < 1: plugins = self.plugins.keys() for plugin in plugins: if not self.plugins[plugin]: continue if not hasattr(self, plugin): continue if hasattr(getattr(self, plugin), "set_runtime"): try: getattr(self, plugin).set_runtime(runtime) except RuntimeError as e: log.error(_("Cannot set runtime for plugin %s: %s") % (plugin, e)) else: log.debug(_("Not setting runtime for plugin %s: No function 'set_runtime()'") % (plugin), level=5) def add_options(self, parser, plugins=[]): """ Add options specified in a plugin to parser. Takes a list of plugin names or does them all """ if len(plugins) < 1: plugins = self.plugins.keys() for plugin in plugins: if not self.plugins[plugin]: continue if not hasattr(self, plugin): continue if hasattr(getattr(self, plugin), "add_options"): try: exec("self.%s.add_options(parser)" % plugin) except RuntimeError as e: log.error(_("Cannot add options for plugin %s: %s") % (plugin, e)) except TypeError as e: log.error(_("Cannot add options for plugin %s: %s") % (plugin, e)) else: log.debug(_("Not adding options for plugin %s: No function 'add_options()'") % plugin, level=5) def check_options(self, plugins=[]): """ Executes plugin.check_plugins() for all enabled plugins or the list of plugin names specified. """ if len(plugins) < 1: plugins = self.plugins.keys() for plugin in plugins: if not self.plugins[plugin]: continue if not hasattr(self, plugin): continue if hasattr(getattr(self, plugin), "check_options"): try: exec("self.%s.check_options()" % plugin) except AttributeError as e: log.error(_("Cannot check options for plugin %s: %s") % (plugin, e)) else: log.debug(_("Not checking options for plugin %s: No function 'check_options()'") % (plugin), level=5) def plugin_check_setting(self, func, option, val, plugins=[]): """ Checks one setting specified by 'option' against the 'val' it is passed by all plugins or by the list of plugins specified """ if len(plugins) < 1: plugins = self.plugins.keys() for plugin in plugins: if not self.plugins[plugin]: continue if not hasattr(self, plugin): continue if hasattr(getattr(self, plugin), "%s_%s" % (func, option)): - exec("retval = getattr(self, plugin).%s_%s(val)" % (func, option)) + retval = eval("getattr(self, plugin).%s_%s(val)" % (func, option)) return retval return False def exec_hook(self, hook, plugins=[], kw={}, args=()): """Execute a hook""" retval = None if len(plugins) < 1: plugins = self.plugins.keys() for plugin in plugins: if not self.plugins[plugin]: continue if not hasattr(self, plugin): continue if hasattr(getattr(self, plugin), hook): try: log.debug(_("Executing hook %s for plugin %s") % (hook, plugin), level=8) func = getattr(getattr(self, plugin), hook) retval = func(*args, **kw) except TypeError as errmsg: log.error( _("Cannot execute hook %s for plugin %s: %s") % (hook, plugin, errmsg) ) log.error(traceback.format_exc()) except AttributeError as errmsg: log.error( _("Cannot execute hook %s for plugin %s: %s") % (hook, plugin, errmsg) ) log.error(traceback.format_exc()) return retval def return_true_boolean_from_plugins(self, bool, plugins=[]): """ Given the name of a boolean, walks all specified plugins, or all available plugins, and returns True if a plugin has it set to true """ if len(plugins) < 1: plugins = self.plugins.keys() retval = False for plugin in plugins: if not self.plugins[plugin]: continue if not hasattr(self, plugin): continue if hasattr(getattr(self, plugin), bool): try: - exec("boolval = self.%s.%s" % (plugin, bool)) + boolval = eval("self.%s.%s" % (plugin, bool)) except AttributeError: pass else: boolval = None if boolval: retval = True return retval diff --git a/pykolab/plugins/defaultfolders/__init__.py b/pykolab/plugins/defaultfolders/__init__.py index a4a59b0..cfa6d28 100644 --- a/pykolab/plugins/defaultfolders/__init__.py +++ b/pykolab/plugins/defaultfolders/__init__.py @@ -1,56 +1,56 @@ # -*- coding: utf-8 -*- # Copyright 2010-2013 Kolab Systems AG (http://www.kolabsys.com) # # Jeroen van Meeuwen (Kolab Systems) # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # You should have received a copy of the GNU General Public License # along with this program. If not, see . # import pykolab from pykolab.translate import _ log = pykolab.getLogger('pykolab.plugins.defaultfolders') conf = pykolab.getConf() class KolabDefaultfolders(object): """ Example plugin to create a set of default folders. """ def __init__(self): pass def add_options(self, *args, **kw): pass def create_user_folders(self, *args, **kw): """ The arguments passed to the 'create_user_folders' hook: additional_folders - additional folders to create user_folder - user folder """ if 'additional_folders' not in kw: log.error(_("Plugin %s called without required keyword %s.") % ("defaultfolders", "additional_folders")) return {} try: - exec("additional_folders = %s" % (kw['additional_folders'])) + additional_folders = eval("%s" % (kw['additional_folders'])) except Exception: log.error(_("Could not parse additional_folders")) return {} return additional_folders diff --git a/pykolab/plugins/recipientpolicy/__init__.py b/pykolab/plugins/recipientpolicy/__init__.py index 330ddfa..da1db06 100644 --- a/pykolab/plugins/recipientpolicy/__init__.py +++ b/pykolab/plugins/recipientpolicy/__init__.py @@ -1,163 +1,163 @@ # -*- coding: utf-8 -*- # Copyright 2010-2013 Kolab Systems AG (http://www.kolabsys.com) # # Jeroen van Meeuwen (Kolab Systems) # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # You should have received a copy of the GNU General Public License # along with this program. If not, see . # import pykolab from pykolab import utils from pykolab.translate import _ conf = pykolab.getConf() log = pykolab.getLogger('pykolab.plugins.recipientpolicy') class KolabRecipientpolicy(object): """ Example plugin making quota adjustments given arbitrary conditions. """ def __init__(self): pass def add_options(self, *args, **kw): pass #def mail_domain_space_policy_check(self, kw={}, args=()): #(mail, alternative_mail, domain_name, domain_root_dn) = args ## Your actions go here. For example: #return (mail, alternative_mail) def set_primary_mail(self, *args, **kw): """ The arguments passed to the 'set_user_attrs_mail' hook: primary_mail - the policy user_attrs - the current user attributes primary_domain - the domain to use in the primary mail attribute secondary_domains - the secondary domains that are aliases Return the new primary mail address """ user_attrs = utils.normalize(kw['entry']) if 'domain' not in user_attrs: user_attrs['domain'] = kw['primary_domain'] elif not user_attrs['domain'] == kw['primary_domain']: user_attrs['domain'] = kw['primary_domain'] if 'preferredlanguage' not in user_attrs: default_locale = conf.get(user_attrs['domain'], 'default_locale') if default_locale == None: default_locale = conf.get('kolab', 'default_locale') if default_locale == None: default_locale = 'en_US' user_attrs['preferredlanguage'] = default_locale try: mail = kw['primary_mail'] % user_attrs mail = utils.translate(mail, user_attrs['preferredlanguage']) mail = mail.lower() return mail except KeyError: log.warning(_("Attribute substitution for 'mail' failed in Recipient Policy")) if 'mail' in user_attrs: return user_attrs['mail'] else: return None def set_secondary_mail(self, *args, **kw): """ The arguments passed to the 'set_user_attrs_alternative_mail' hook: primary_mail - the policy user_attrs - the current user attributes primary_domain - the domain to use in the primary mail attribute secondary_domains - the secondary domains that are aliases Return a list of secondary mail addresses """ user_attrs = utils.normalize(kw['entry']) if 'domain' not in user_attrs: user_attrs['domain'] = kw['primary_domain'] elif not user_attrs['domain'] == kw['primary_domain']: user_attrs['domain'] = kw['primary_domain'] if 'preferredlanguage' not in user_attrs: default_locale = conf.get(user_attrs['domain'], 'default_locale') if default_locale == None: default_locale = conf.get(user_attrs['domain'], 'default_locale') if default_locale == None: default_locale = 'en_US' user_attrs['preferredlanguage'] = default_locale try: - exec("alternative_mail_routines = %s" % kw['secondary_mail']) + alternative_mail_routines = eval("%s" % kw['secondary_mail']) except Exception: log.error(_("Could not parse the alternative mail routines")) alternative_mail = [] log.debug(_("Alternative mail routines: %r") % (alternative_mail_routines), level=8) _domains = [ kw['primary_domain'] ] + kw['secondary_domains'] for attr in [ 'givenname', 'sn', 'surname' ]: try: user_attrs[attr] = utils.translate(user_attrs[attr], user_attrs['preferredlanguage']) except Exception: log.error(_("An error occurred in composing the secondary mail attribute for entry %r") % (user_attrs['id'])) if conf.debuglevel > 8: import traceback traceback.print_exc() return [] for number in alternative_mail_routines: for routine in alternative_mail_routines[number]: try: - exec("retval = '%s'.%s" % (routine,alternative_mail_routines[number][routine] % user_attrs)) + retval = eval("'%s'.%s" % (routine, alternative_mail_routines[number][routine] % user_attrs)) log.debug(_("Appending additional mail address: %s") % (retval), level=8) alternative_mail.append(retval) except Exception as errmsg: log.error(_("Policy for secondary email address failed: %r") % (errmsg)) if conf.debuglevel > 8: import traceback traceback.print_exc() return [] for _domain in kw['secondary_domains']: user_attrs['domain'] = _domain try: - exec("retval = '%s'.%s" % (routine,alternative_mail_routines[number][routine] % user_attrs)) + retval = eval("'%s'.%s" % (routine, alternative_mail_routines[number][routine] % user_attrs)) log.debug(_("Appending additional mail address: %s") % (retval), level=8) alternative_mail.append(retval) except KeyError: log.warning(_("Attribute substitution for 'alternative_mail' failed in Recipient Policy")) alternative_mail = utils.normalize(alternative_mail) alternative_mail = list(set(alternative_mail)) return alternative_mail diff --git a/pykolab/wap_client/__init__.py b/pykolab/wap_client/__init__.py index 176ac91..4ab941d 100644 --- a/pykolab/wap_client/__init__.py +++ b/pykolab/wap_client/__init__.py @@ -1,667 +1,667 @@ import json try: import httplib except ImportError: import http.client as httplib import urllib import sys try: from urlparse import urlparse except ImportError: from urllib.parse import urlparse import pykolab from pykolab import utils from pykolab.translate import _ log = pykolab.getLogger('pykolab.wap_client') conf = pykolab.getConf() if not hasattr(conf, 'defaults'): conf.finalize_conf() API_HOSTNAME = "localhost" API_SCHEME = "http" API_PORT = 80 API_SSL = False API_BASE = "/kolab-webadmin/api/" kolab_wap_url = conf.get('kolab_wap', 'api_url') if not kolab_wap_url == None: result = urlparse(kolab_wap_url) else: result = None if hasattr(result, 'scheme') and result.scheme == 'https': API_SSL = True API_PORT = 443 if hasattr(result, 'hostname'): API_HOSTNAME = result.hostname if hasattr(result, 'port'): API_PORT = result.port if hasattr(result, 'path'): API_BASE = result.path session_id = None conn = None def authenticate(username=None, password=None, domain=None): global session_id if username == None: username = conf.get('ldap', 'bind_dn') if password == None: password = conf.get('ldap', 'bind_pw') if domain == None: domain = conf.get('kolab', 'primary_domain') post = json.dumps( { 'username': username, 'password': password, 'domain': domain } ) response = request('POST', "system.authenticate", post=post) if not response: return False if 'session_token' in response: session_id = response['session_token'] return True def connect(uri=None): global conn, API_SSL, API_PORT, API_HOSTNAME, API_BASE if not uri == None: result = urlparse(uri) if hasattr(result, 'scheme') and result.scheme == 'https': API_SSL = True API_PORT = 443 if hasattr(result, 'hostname'): API_HOSTNAME = result.hostname if hasattr(result, 'port'): API_PORT = result.port if hasattr(result, 'path'): API_BASE = result.path if conn == None: if API_SSL: conn = httplib.HTTPSConnection(API_HOSTNAME, API_PORT) else: conn = httplib.HTTPConnection(API_HOSTNAME, API_PORT) conn.connect() return conn def disconnect(quit=False): global conn, session_id if quit and session_id: request('GET', 'system.quit') session_id = None if conn: conn.close() conn = None def domain_add(domain, aliases=[]): dna = conf.get('ldap', 'domain_name_attribute') post = json.dumps({ dna: [ domain ] + aliases }) return request('POST', 'domain.add', post=post) def domain_delete(domain, force=False): domain_id, domain_attrs = domain_find(domain).popitem() param = {} param['id'] = domain_id if force: param['force'] = force post = json.dumps(param) return request('POST', 'domain.delete', post=post) def domain_find(domain): dna = conf.get('ldap', 'domain_name_attribute') get = { dna: domain } return request('GET', 'domain.find', get=get) def domain_info(domain): domain_id, domain_attrs = domain_find(domain) get = { 'id': domain_id } return request('GET', 'domain.info', get=get) def domains_capabilities(): return request('GET', 'domains.capabilities') def domains_list(): return request('GET', 'domains.list') def form_value_generate(params): post = json.dumps(params) return request('POST', 'form_value.generate', post=post) def form_value_generate_password(*args, **kw): return request('GET', 'form_value.generate_password') def form_value_list_options(object_type, object_type_id, attribute): post = json.dumps( { 'object_type': object_type, 'type_id': object_type_id, 'attribute': attribute } ) return request('POST', 'form_value.list_options', post=post) def form_value_select_options(object_type, object_type_id, attribute): post = json.dumps( { 'object_type': object_type, 'type_id': object_type_id, 'attributes': [ attribute ] } ) return request('POST', 'form_value.select_options', post=post) def get_group_input(): group_types = group_types_list() if len(group_types) > 1: for key in group_types: if not key == "status": print("%s) %s" % (key,group_types[key]['name'])) group_type_id = utils.ask_question("Please select the group type") elif len(group_types) > 0: print("Automatically selected the only group type available") group_type_id = group_types.keys()[0] else: print("No group types available") sys.exit(1) if group_type_id in group_types: group_type_info = group_types[group_type_id]['attributes'] else: print("No such group type") sys.exit(1) params = { 'group_type_id': group_type_id } for attribute in group_type_info['form_fields']: params[attribute] = utils.ask_question(attribute) for attribute in group_type_info['auto_form_fields']: - exec("retval = group_form_value_generate_%s(params)" % (attribute)) + retval = eval("group_form_value_generate_%s(params)" % (attribute)) params[attribute] = retval[attribute] return params def get_user_input(): user_types = user_types_list() if user_types['count'] > 1: print("") for key in user_types['list']: if not key == "status": print("%s) %s" % (key,user_types['list'][key]['name'])) print("") user_type_id = utils.ask_question("Please select the user type") elif user_types['count'] > 0: print("Automatically selected the only user type available") user_type_id = user_types['list'].keys()[0] else: print("No user types available") sys.exit(1) if user_type_id in user_types['list']: user_type_info = user_types['list'][user_type_id]['attributes'] else: print("No such user type") sys.exit(1) params = { 'object_type': 'user', 'type_id': user_type_id } must_attrs = [] may_attrs = [] for attribute in user_type_info['form_fields']: if isinstance(user_type_info['form_fields'][attribute], dict): if 'optional' in user_type_info['form_fields'][attribute] and user_type_info['form_fields'][attribute]['optional']: may_attrs.append(attribute) else: must_attrs.append(attribute) else: must_attrs.append(attribute) for attribute in must_attrs: if isinstance(user_type_info['form_fields'][attribute], dict) and \ 'type' in user_type_info['form_fields'][attribute]: if user_type_info['form_fields'][attribute]['type'] == 'select': if 'values' not in user_type_info['form_fields'][attribute]: attribute_values = form_value_select_options('user', user_type_id, attribute) default = '' if 'default' in attribute_values[attribute]: default = attribute_values[attribute]['default'] params[attribute] = utils.ask_menu( "Choose the %s value" % (attribute), attribute_values[attribute]['list'], default=default ) else: default = '' if 'default' in user_type_info['form_fields'][attribute]: default = user_type_info['form_fields'][attribute]['default'] params[attribute] = utils.ask_menu( "Choose the %s value" % (attribute), user_type_info['form_fields'][attribute]['values'], default=default ) else: params[attribute] = utils.ask_question(attribute) else: params[attribute] = utils.ask_question(attribute) for attribute in user_type_info['fields']: params[attribute] = user_type_info['fields'][attribute] - exec("retval = user_form_value_generate(params)") + retval = eval("user_form_value_generate(params)") print(retval) return params def group_add(params=None): if params == None: params = get_group_input() post = json.dumps(params) return request('POST', 'group.add', post=post) def group_delete(params=None): if params == None: params = { 'id': utils.ask_question("Name of group to delete", "group") } post = json.dumps(params) return request('POST', 'group.delete', post=post) def group_form_value_generate_mail(params=None): if params == None: params = get_user_input() params = json.dumps(params) return request('POST', 'group_form_value.generate_mail', params) def group_find(params=None): post = { 'search': { 'params': {} } } for (k,v) in params.items(): post['search']['params'][k] = { 'value': v, 'type': 'exact' } return request('POST', 'group.find', post=json.dumps(post)) def group_info(group=None): if group == None: group = utils.ask_question("group DN") return request('GET', 'group.info', get={ 'id': group }) def group_members_list(group=None): if group == None: group = utils.ask_question("Group email address") group = request('GET', 'group.members_list?group=%s' % (group)) return group def group_types_list(): return request('GET', 'group_types.list') def groups_list(params={}): return request('POST', 'groups.list', post=json.dumps(params)) def ou_add(params={}): return request('POST', 'ou.add', post=json.dumps(params)) def ou_delete(params={}): return request('POST', 'ou.delete', post=json.dumps(params)) def ou_edit(params={}): return request('POST', 'ou.edit', post=json.dumps(params)) def ou_find(params=None): post = { 'search': { 'params': {} } } for (k,v) in params.items(): post['search']['params'][k] = { 'value': v, 'type': 'exact' } return request('POST', 'ou.find', post=json.dumps(post)) def ou_info(ou): _params = { 'id': ou } ou = request('GET', 'ou.info', get=_params) return ou def ous_list(params={}): return request('POST', 'ous.list', post=json.dumps(params)) def request(method, api_uri, get=None, post=None, headers={}): response_data = request_raw(method, api_uri, get, post, headers) if response_data['status'] == "OK": del response_data['status'] return response_data['result'] else: print("%s: %s (code %s)" % (response_data['status'], response_data['reason'], response_data['code'])) return False def request_raw(method, api_uri, get=None, post=None, headers={}, isretry=False): global session_id if not session_id == None: headers["X-Session-Token"] = session_id reconnect = False conn = connect() if conf.debuglevel > 8: conn.set_debuglevel(9) if not get == None: _get = "?%s" % (urllib.urlencode(get)) else: _get = "" log.debug(_("Requesting %r with params %r") % ("%s/%s" % (API_BASE,api_uri), (get, post)), level=8) try: conn.request(method.upper(), "%s/%s%s" % (API_BASE, api_uri, _get), post, headers) response = conn.getresponse() data = response.read() log.debug(_("Got response: %r") % (data), level=8) except (httplib.BadStatusLine, httplib.CannotSendRequest) as e: if isretry: raise e log.info(_("Connection error: %r; re-connecting..."), e) reconnect = True # retry with a new connection if reconnect: disconnect() return request_raw(method, api_uri, get, post, headers, True) try: response_data = json.loads(data) except ValueError: # Some data is not JSON log.error(_("Response data is not JSON")) return response_data def resource_add(params=None): if params == None: params = get_user_input() return request('POST', 'resource.add', post=json.dumps(params)) def resource_delete(params=None): if params == None: params = { 'id': utils.ask_question("Resource DN to delete", "resource") } return request('POST', 'resource.delete', post=json.dumps(params)) def resource_find(params=None): post = { 'search': { 'params': {} } } for (k,v) in params.items(): post['search']['params'][k] = { 'value': v, 'type': 'exact' } return request('POST', 'resource.find', post=json.dumps(post)) def resource_info(resource=None): if resource == None: resource = utils.ask_question("Resource DN") return request('GET', 'resource.info', get={ 'id': resource }) def resource_types_list(): return request('GET', 'resource_types.list') def resources_list(params={}): return request('POST', 'resources.list', post=json.dumps(params)) def role_add(params=None): if params == None: role_name = utils.ask_question("Role name") params = { 'cn': role_name } params = json.dumps(params) return request('POST', 'role.add', params) def role_capabilities(): return request('GET', 'role.capabilities') def role_delete(params=None): if params == None: role_name = utils.ask_question("Role name") role = role_find_by_attribute({'cn': role_name}) params = { 'role': role.keys()[0] } if 'role' not in params: role = role_find_by_attribute(params) params = { 'role': role.keys()[0] } post = json.dumps(params) return request('POST', 'role.delete', post=post) def role_find_by_attribute(params=None): if params == None: role_name = utils.ask_question("Role name") else: role_name = params['cn'] get = { 'cn': role_name } role = request('GET', 'role.find_by_attribute', get=get) return role def role_info(role_name): role = role_find_by_attribute({'cn': role_name}) get = { 'role': role['id'] } role = request('GET', 'role.info', get=get) return role def roles_list(): return request('GET', 'roles.list') def sharedfolder_add(params=None): if params == None: params = get_user_input() return request('POST', 'sharedfolder.add', post=json.dumps(params)) def sharedfolder_delete(params=None): if params == None: params = { 'id': utils.ask_question("Shared Folder DN to delete", "sharedfolder") } return request('POST', 'sharedfolder.delete', post=json.dumps(params)) def sharedfolders_list(params={}): return request('POST', 'sharedfolders.list', post=json.dumps(params)) def system_capabilities(domain=None): return request('GET', 'system.capabilities', get={'domain':domain}) def system_get_domain(): return request('GET', 'system.get_domain') def system_select_domain(domain=None): if domain == None: domain = utils.ask_question("Domain name") get = { 'domain': domain } return request('GET', 'system.select_domain', get=get) def user_add(params=None): if params == None: params = get_user_input() params = json.dumps(params) return request('POST', 'user.add', post=params) def user_delete(params=None): if params == None: params = { 'id': utils.ask_question("Username for user to delete", "user") } post = json.dumps(params) return request('POST', 'user.delete', post=post) def user_edit(user = None, attributes={}): if user == None: get = { 'id': utils.ask_question("Username for user to edit", "user") } else: get = { 'id': user } user_info = request('GET', 'user.info', get=get) for attribute in attributes: user_info[attribute] = attributes[attribute] post = json.dumps(user_info) user_edit = request('POST', 'user.edit', get=get, post=post) return user_edit def user_find(attribs=None): if attribs == None: post = { 'search': { 'params': { utils.ask_question("Attribute") : { 'value': utils.ask_question("value"), 'type': 'exact' } } } } else: post = { 'search': { 'params': {} } } for (k,v) in attribs.items(): post['search']['params'][k] = { 'value': v, 'type': 'exact' } post = json.dumps(post) user = request('POST', 'user.find', post=post) return user def user_form_value_generate(params=None): if params == None: params = get_user_input() post = json.dumps(params) return request('POST', 'form_value.generate', post=post) def user_form_value_generate_uid(params=None): if params == None: params = get_user_input() params = json.dumps(params) return request('POST', 'form_value.generate_uid', params) def user_form_value_generate_userpassword(*args, **kw): result = form_value_generate_password() return { 'userpassword': result['password'] } def user_info(user=None): if user == None: user = utils.ask_question("User email address") _params = { 'id': user } user = request('GET', 'user.info', get=_params) return user def users_list(params={}): return request('POST', 'users.list', post=json.dumps(params)) def user_types_list(): return request('GET', 'user_types.list') diff --git a/tests/functional/test_kolabd/test_001_user_sync.py b/tests/functional/test_kolabd/test_001_user_sync.py index cf8d443..0652d23 100644 --- a/tests/functional/test_kolabd/test_001_user_sync.py +++ b/tests/functional/test_kolabd/test_001_user_sync.py @@ -1,140 +1,140 @@ import time import unittest import pykolab from pykolab import wap_client from pykolab.auth import Auth from pykolab.imap import IMAP conf = pykolab.getConf() class TestKolabDaemon(unittest.TestCase): @classmethod def setup_class(self, *args, **kw): from tests.functional.purge_users import purge_users purge_users() self.user = { 'local': 'john.doe', 'domain': 'example.org' } from tests.functional.user_add import user_add user_add("John", "Doe") @classmethod def teardown_class(self, *args, **kw): from tests.functional.purge_users import purge_users purge_users() def test_001_user_recipient_policy(self): auth = Auth() auth.connect() recipient = auth.find_recipient("%(local)s@%(domain)s" % (self.user)) if hasattr(self, 'assertIsInstance'): self.assertIsInstance(recipient, str) self.assertEqual(recipient, "uid=doe,ou=People,dc=example,dc=org") result = wap_client.user_info(recipient) self.assertEqual(result['mail'], 'john.doe@example.org') self.assertEqual(result['alias'], ['doe@example.org', 'j.doe@example.org']) def test_002_user_recipient_policy_duplicate(self): from tests.functional.user_add import user_add user = { 'local': 'jane.doe', 'domain': 'example.org' } user_add("Jane", "Doe") time.sleep(3) auth = Auth() auth.connect() recipient = auth.find_recipient("%(local)s@%(domain)s" % (user)) if hasattr(self, 'assertIsInstance'): self.assertIsInstance(recipient, str) self.assertEqual(recipient, "uid=doe2,ou=People,dc=example,dc=org") result = wap_client.user_info(recipient) if 'mailhost' not in result: from tests.functional.synchronize import synchronize_once synchronize_once() result = wap_client.user_info(recipient) self.assertEqual(result['mail'], 'jane.doe@example.org') self.assertEqual(result['alias'], ['doe2@example.org', 'j.doe2@example.org']) def test_003_user_mailbox_created(self): time.sleep(2) imap = IMAP() imap.connect() folders = imap.lm('user/%(local)s@%(domain)s' % (self.user)) self.assertEqual(len(folders), 1) def test_004_user_additional_folders_created(self): time.sleep(2) imap = IMAP() imap.connect() ac_folders = conf.get_raw('kolab', 'autocreate_folders') - exec("ac_folders = %s" % (ac_folders)) + ac_folders = eval("%s" % (ac_folders)) folders = imap.lm('user/%(local)s/*@%(domain)s' % (self.user)) self.assertEqual(len(folders), len(ac_folders)) def test_005_user_folders_metadata_set(self): imap = IMAP() imap.connect() ac_folders = conf.get_raw('kolab', 'autocreate_folders') - exec("ac_folders = %s" % (ac_folders)) + ac_folders = eval("%s" % (ac_folders)) folders = [] folders.extend(imap.lm('user/%(local)s@%(domain)s' % (self.user))) folders.extend(imap.lm('user/%(local)s/*@%(domain)s' % (self.user))) for folder in folders: metadata = imap.get_metadata(folder) print(metadata) folder_name = '/'.join(folder.split('/')[2:]).split('@')[0] if folder_name in ac_folders: if 'annotations' in ac_folders[folder_name]: for _annotation in ac_folders[folder_name]['annotations']: if _annotation.startswith('/private'): continue _annotation_value = ac_folders[folder_name]['annotations'][_annotation] self.assertTrue(_annotation in metadata[metadata.keys().pop()]) self.assertEqual(_annotation_value, metadata[metadata.keys().pop()][_annotation]) def test_006_user_subscriptions(self): imap = IMAP() imap.connect(login=False) login = conf.get('cyrus-imap', 'admin_login') password = conf.get('cyrus-imap', 'admin_password') imap.login_plain(login, password, 'john.doe@example.org') folders = imap.lm() self.assertTrue("INBOX" in folders) folders = imap.imap.lsub() self.assertTrue("Calendar" in folders) def test_011_resource_add(self): pass def test_012_resource_mailbox_created(self): pass def test_013_resource_mailbox_annotation(self): pass diff --git a/tests/functional/test_wap_client/test_002_user_add.py b/tests/functional/test_wap_client/test_002_user_add.py index a48b92b..865d64d 100644 --- a/tests/functional/test_wap_client/test_002_user_add.py +++ b/tests/functional/test_wap_client/test_002_user_add.py @@ -1,78 +1,78 @@ import time import unittest import pykolab from pykolab import wap_client from pykolab.auth import Auth from pykolab.imap import IMAP conf = pykolab.getConf() class TestUserAdd(unittest.TestCase): @classmethod def setup_class(self, *args, **kw): from tests.functional.purge_users import purge_users purge_users() self.user = { 'local': 'john.doe', 'domain': 'example.org' } from tests.functional.user_add import user_add user_add("John", "Doe") from tests.functional.synchronize import synchronize_once synchronize_once() @classmethod def teardown_class(self, *args, **kw): from tests.functional.purge_users import purge_users purge_users() def test_001_inbox_created(self): time.sleep(2) imap = IMAP() imap.connect() folders = imap.lm('user/%(local)s@%(domain)s' % (self.user)) self.assertEqual(len(folders), 1) def test_002_autocreate_folders_created(self): time.sleep(2) imap = IMAP() imap.connect() - exec("ac_folders = %s" % (conf.get_raw(conf.get('kolab', 'primary_domain'), 'autocreate_folders'))) + ac_folders = eval("%s" % (conf.get_raw(conf.get('kolab', 'primary_domain'), 'autocreate_folders'))) folders = imap.lm('user/%(local)s/*@%(domain)s' % (self.user)) print(folders) print(ac_folders.keys()) self.assertEqual(len(folders), len(ac_folders)) def test_003_folders_metadata_set(self): imap = IMAP() imap.connect() - exec("ac_folders = %s" % (conf.get_raw(conf.get('kolab', 'primary_domain'), 'autocreate_folders'))) + ac_folders = eval("%s" % (conf.get_raw(conf.get('kolab', 'primary_domain'), 'autocreate_folders'))) folders = [] folders.extend(imap.lm('user/%(local)s@%(domain)s' % (self.user))) folders.extend(imap.lm('user/%(local)s/*@%(domain)s' % (self.user))) for folder in folders: metadata = imap.get_metadata(folder) folder_name = '/'.join(folder.split('/')[2:]).split('@')[0] if folder_name in ac_folders: if 'annotations' in ac_folders[folder_name]: for _annotation in ac_folders[folder_name]['annotations']: if _annotation.startswith('/private/'): continue _annotation_value = ac_folders[folder_name]['annotations'][_annotation] self.assertTrue(_annotation in metadata[metadata.keys().pop()]) self.assertEqual(_annotation_value, metadata[metadata.keys().pop()][_annotation])