diff --git a/.arclint b/.arclint --- a/.arclint +++ b/.arclint @@ -19,6 +19,7 @@ "E131": "disabled", "E201": "disabled", "E202": "disabled", + "E221": "disabled", "E225": "disabled", "E231": "disabled", "E251": "disabled", diff --git a/pykolab/logger.py b/pykolab/logger.py --- a/pykolab/logger.py +++ b/pykolab/logger.py @@ -35,16 +35,27 @@ self.logger = logger self.log_level = log_level self.linebuf = '' + self.skip_next = False def write(self, buf): - # ugly patch to make smtplib debug logging records appear on one line in log file + # ugly patch to make smtplib and smtpd debug logging records appear on one line in log file # smtplib uses "print>>stderr, var, var" statements for debug logging. These # statements are splited into separate lines on separating whitespace. + for line in buf.rstrip().splitlines(): + if self.skip_next: + self.skip_next = False + continue + if buf != '\n': - if line.startswith('send:') or line.startswith('reply:'): + linestarts = line.split(':')[0] + if linestarts in ['send', 'reply', 'Data', 'recips', 'Peer', 'sender']: self.linebuf = line - return + elif linestarts.startswith('===>'): + # Do not log lines starting with ====> + self.linebuf = '' + self.skip_next = True + continue else: self.logger.log(self.log_level, '%s %s', self.linebuf, line.rstrip()[:150]) self.linebuf = '' diff --git a/wallace/__init__.py b/wallace/__init__.py --- a/wallace/__init__.py +++ b/wallace/__init__.py @@ -24,7 +24,7 @@ import os import pwd import traceback -from smtpd import SMTPChannel +import smtpd import socket import struct import sys @@ -38,6 +38,7 @@ from pykolab.translate import _ log = pykolab.getLogger('pykolab.wallace') +sys.stderr = pykolab.logger.StderrToLogger(log) conf = pykolab.getConf() from modules import cb_action_ACCEPT @@ -80,11 +81,18 @@ if continue_with_accept: cb_action_ACCEPT('wallace', filepath) + return continue_with_accept + def modules_heartbeat(wallace_modules): lastrun = 0 - while True: + while not multiprocessing.current_process().finished.is_set(): try: + log.debug(_("Running %s (pid: %s)") % + (multiprocessing.current_process().name, multiprocessing.current_process().pid), + level=8 + ) + for module in wallace_modules: try: modules.heartbeat(module, lastrun) @@ -92,11 +100,15 @@ log.error(_("Module %s.heartbeat() failed with error: %s" % (module, traceback.format_exc()))) lastrun = int(time.time()) - time.sleep(60) - except (SystemExit, KeyboardInterrupt), e: - log.info("Terminating heartbeat process") + multiprocessing.current_process().finished.wait(60) + + except (SystemExit, KeyboardInterrupt), errmsg: + log.warning(_("Exiting %s, %s") %(multiprocessing.current_process().name, errmsg), level=8) break + except Exception, errmsg: + log.error(_("Wallace heartbeat failed: %s") % errmsg) + def worker_process(*args, **kw): log.debug(_("Worker process %s initializing") % (multiprocessing.current_process().name), level=1) @@ -105,14 +117,18 @@ while True: while not self.finished.is_set(): self.finished.wait(self.interval) + log.debug(_("Timer looping function '%s' every %ss") % (self.function.__name__, self.interval), level=8) self.function(*self.args, **self.kwargs) self.finished.set() + log.debug(_("Timer loop %s") % ('still active', 'finished')[self.finished.is_set()], level=8) + break class WallaceDaemon(object): def __init__(self): self.current_connections = 0 self.max_connections = 24 + self.parent_pid = None self.pool = None daemon_group = conf.add_cli_parser_option_group(_("Daemon Options")) @@ -144,12 +160,12 @@ ) daemon_group.add_option( - "--threads", - dest = "max_threads", + "--max-tasks", + dest = "max_tasks", action = "store", - default = 4, + default = 10, type = int, - help = _("Number of threads to use.") + help = _("Number of tasks per process.") ) daemon_group.add_option( @@ -169,6 +185,15 @@ help = _("Port that Wallace is supposed to use.") ) + daemon_group.add_option( + "--threads", + dest = "max_threads", + action = "store", + default = 4, + type = int, + help = _("Number of threads to use.") + ) + daemon_group.add_option( "-u", "--user", @@ -187,6 +212,12 @@ conf.process_groupname ) + # Enable Python Multiprocess logging + if conf.debuglevel > 8: + mp_logger = multiprocessing.get_logger() + mp_logger.setLevel(multiprocessing.SUBDEBUG) + mp_logger.warning('Python Multiprocess logging started') + import modules modules.__init__() @@ -195,8 +226,10 @@ self.modules = [] def do_wallace(self): + self.parent_pid = os.getpid() + if version.StrictVersion(sys.version[:3]) >= version.StrictVersion("2.7"): - self.pool = multiprocessing.Pool(conf.max_threads, worker_process, (), 1) + self.pool = multiprocessing.Pool(conf.max_threads, worker_process, (), conf.max_tasks) else: self.pool = multiprocessing.Pool(conf.max_threads, worker_process, ()) @@ -238,26 +271,42 @@ s.listen(5) - self.timer = Timer(180, self.pickup_spool_messages) + self.timer = Timer(180, self.pickup_spool_messages, args=[], kwargs={'sync': True}) self.timer.start() # start background process to run periodic jobs in active modules - self.heartbeat = multiprocessing.Process(target=modules_heartbeat, args=[self.modules]) - self.heartbeat.daemon = True - self.heartbeat.start() + try: + self.heartbeat = multiprocessing.Process(target=modules_heartbeat, name="Wallace_Heartbeat", args=[self.modules]) + self.heartbeat.finished = multiprocessing.Event() + self.heartbeat.daemon = True + self.heartbeat.start() + except Exception, errmsg: + log.error(_("Failed to start heartbeat daemon: %s %s") % (Exception, errmsg)) + else: + log.debug(_("Wallace heartbeat is %s") % ('not alive','alive')[self.heartbeat.is_alive()], level=8) try: while 1: while self.current_connections >= self.max_connections: + log.debug(_("Reached limit of max connections of: %s. Sleeping for 0.5s") % self.max_connections, level=6) time.sleep(0.5) pair = s.accept() - log.info(_("Accepted connection")) + log.debug(_("Accepted connection %r with address %r") % (pair if pair is not None else (None, None)), level=8) if not pair == None: self.current_connections += 1 connection, address = pair - channel = SMTPChannel(self, connection, address) + + _smtpd = smtpd + # Set DEBUGSTREAM of smtpd to log to pykolab logger + if conf.debuglevel > 8: + _smtpd.DEBUGSTREAM = pykolab.logger.StderrToLogger(log) + + log.debug(_("Creating SMTPChannel for accepted message"), level=8) + channel = _smtpd.SMTPChannel(self, connection, address) asyncore.loop() + else: + log.error(_("Socket accepted, but (conn, address) tuple is None")) except Exception, errmsg: traceback.print_exc() @@ -265,14 +314,17 @@ s.close() # shut down hearbeat process + log.warning(_("About to terminate heartbeat process")) self.heartbeat.terminate() + self.timer.cancel() + self.timer.join() def data_header(self, mailfrom, rcpttos): COMMASPACE = ', ' return "X-Kolab-From: " + mailfrom + "\r\n" + \ "X-Kolab-To: " + COMMASPACE.join(rcpttos) + "\r\n" - def pickup_spool_messages(self, sync=False): + def pickup_spool_messages(self, sync=False, *args, **kwargs): # Mind you to include the trailing slash pickup_path = '/var/spool/pykolab/wallace/' @@ -378,24 +430,53 @@ os.write(fp, data) os.close(fp) - self.pool.apply_async(pickup_message, (filename, (self.modules))) + log.debug(_("Started processing accepted message %s") % filename, level=8) - self.current_connections -= 1 - - return "250 OK Message %s queued" % (filename) + try: + log.debug(_("There are %s currently active pool child processes") % len(multiprocessing.active_children()), level=8) + result = self.pool.apply_async(pickup_message, (filename, (self.modules))) + log.debug(_("Wallace cb_action_accept: %s") % result.get(timeout=20), level=8) + except TimeoutError, errmsg: + log.error(_("Multiprocessing.TimeoutError %s, leaving message for Wallace Timer to pickup") % errmsg) + finally: + self.current_connections -= 1 + return "250 OK Message %s queued" % (filename) def reload_config(self, *args, **kw): - pass + log.error(_("Received signal %s ") % args[0]) + return def remove_pid(self, *args, **kw): - if os.access(conf.pidfile, os.R_OK): - os.remove(conf.pidfile) + try: + if os.getpid() == self.parent_pid: + log.debug("Stopping process %s" % multiprocessing.current_process().name) + + log.debug(_("Terminating processes pool"), level=8) + self.pool.close() + + if hasattr(self, 'timer'): + if not self.timer.finished.is_set(): + log.debug("Canceling Wallace Timer", level=8) + self.timer.finished.set() + self.timer.cancel() + + log.debug(_("Terminating heartbeat process"), level=8) + self.heartbeat.finished.set() + self.heartbeat.terminate() - if self.pool is not None: - self.pool.close() - self.pool.join() + self.pool.join() + self.timer.join(5) + self.heartbeat.join(5) - raise SystemExit + if os.access(conf.pidfile, os.R_OK): + log.warning(_("Removing PID file %s") % conf.pidfile) + os.remove(conf.pidfile) + + log.warning("Exiting!") + sys.exit() + + except Exception, errmsg: + log.debug("Exception while trying to stop %s: %s" % (multiprocessing.current_process().name, errmsg), level=8) def run(self): """ @@ -508,10 +589,6 @@ os.close(1) os.close(2) - os.open(os.devnull, os.O_RDONLY) - os.open(os.devnull, os.O_WRONLY) - os.open(os.devnull, os.O_WRONLY) - log.remove_stdout_handler() self.set_signal_handlers() self.write_pid()