Page MenuHomePhorge

D761.1775211557.diff
No OneTemporary

Authored By
Unknown
Size
11 KB
Referenced Files
None
Subscribers
None

D761.1775211557.diff

diff --git a/.arclint b/.arclint
--- a/.arclint
+++ b/.arclint
@@ -19,6 +19,7 @@
"E131": "disabled",
"E201": "disabled",
"E202": "disabled",
+ "E221": "disabled",
"E225": "disabled",
"E231": "disabled",
"E251": "disabled",
diff --git a/pykolab/logger.py b/pykolab/logger.py
--- a/pykolab/logger.py
+++ b/pykolab/logger.py
@@ -35,16 +35,27 @@
self.logger = logger
self.log_level = log_level
self.linebuf = ''
+ self.skip_next = False
def write(self, buf):
- # ugly patch to make smtplib debug logging records appear on one line in log file
+ # ugly patch to make smtplib and smtpd debug logging records appear on one line in log file
# smtplib uses "print>>stderr, var, var" statements for debug logging. These
# statements are splited into separate lines on separating whitespace.
+
for line in buf.rstrip().splitlines():
+ if self.skip_next:
+ self.skip_next = False
+ continue
+
if buf != '\n':
- if line.startswith('send:') or line.startswith('reply:'):
+ linestarts = line.split(':')[0]
+ if linestarts in ['send', 'reply', 'Data', 'recips', 'Peer', 'sender']:
self.linebuf = line
- return
+ elif linestarts.startswith('===>'):
+ # Do not log lines starting with ====>
+ self.linebuf = ''
+ self.skip_next = True
+ continue
else:
self.logger.log(self.log_level, '%s %s', self.linebuf, line.rstrip()[:150])
self.linebuf = ''
diff --git a/wallace/__init__.py b/wallace/__init__.py
--- a/wallace/__init__.py
+++ b/wallace/__init__.py
@@ -24,7 +24,7 @@
import os
import pwd
import traceback
-from smtpd import SMTPChannel
+import smtpd
import socket
import struct
import sys
@@ -38,6 +38,7 @@
from pykolab.translate import _
log = pykolab.getLogger('pykolab.wallace')
+sys.stderr = pykolab.logger.StderrToLogger(log)
conf = pykolab.getConf()
from modules import cb_action_ACCEPT
@@ -80,11 +81,18 @@
if continue_with_accept:
cb_action_ACCEPT('wallace', filepath)
+ return continue_with_accept
+
def modules_heartbeat(wallace_modules):
lastrun = 0
- while True:
+ while not multiprocessing.current_process().finished.is_set():
try:
+ log.debug(_("Running %s (pid: %s)") %
+ (multiprocessing.current_process().name, multiprocessing.current_process().pid),
+ level=8
+ )
+
for module in wallace_modules:
try:
modules.heartbeat(module, lastrun)
@@ -92,11 +100,15 @@
log.error(_("Module %s.heartbeat() failed with error: %s" % (module, traceback.format_exc())))
lastrun = int(time.time())
- time.sleep(60)
- except (SystemExit, KeyboardInterrupt), e:
- log.info("Terminating heartbeat process")
+ multiprocessing.current_process().finished.wait(60)
+
+ except (SystemExit, KeyboardInterrupt), errmsg:
+ log.warning(_("Exiting %s, %s") %(multiprocessing.current_process().name, errmsg), level=8)
break
+ except Exception, errmsg:
+ log.error(_("Wallace heartbeat failed: %s") % errmsg)
+
def worker_process(*args, **kw):
log.debug(_("Worker process %s initializing") % (multiprocessing.current_process().name), level=1)
@@ -105,14 +117,18 @@
while True:
while not self.finished.is_set():
self.finished.wait(self.interval)
+ log.debug(_("Timer looping function '%s' every %ss") % (self.function.__name__, self.interval), level=8)
self.function(*self.args, **self.kwargs)
self.finished.set()
+ log.debug(_("Timer loop %s") % ('still active', 'finished')[self.finished.is_set()], level=8)
+ break
class WallaceDaemon(object):
def __init__(self):
self.current_connections = 0
self.max_connections = 24
+ self.parent_pid = None
self.pool = None
daemon_group = conf.add_cli_parser_option_group(_("Daemon Options"))
@@ -144,12 +160,12 @@
)
daemon_group.add_option(
- "--threads",
- dest = "max_threads",
+ "--max-tasks",
+ dest = "max_tasks",
action = "store",
- default = 4,
+ default = 10,
type = int,
- help = _("Number of threads to use.")
+ help = _("Number of tasks per process.")
)
daemon_group.add_option(
@@ -169,6 +185,15 @@
help = _("Port that Wallace is supposed to use.")
)
+ daemon_group.add_option(
+ "--threads",
+ dest = "max_threads",
+ action = "store",
+ default = 4,
+ type = int,
+ help = _("Number of threads to use.")
+ )
+
daemon_group.add_option(
"-u",
"--user",
@@ -187,6 +212,12 @@
conf.process_groupname
)
+ # Enable Python Multiprocess logging
+ if conf.debuglevel > 8:
+ mp_logger = multiprocessing.get_logger()
+ mp_logger.setLevel(multiprocessing.SUBDEBUG)
+ mp_logger.warning('Python Multiprocess logging started')
+
import modules
modules.__init__()
@@ -195,8 +226,10 @@
self.modules = []
def do_wallace(self):
+ self.parent_pid = os.getpid()
+
if version.StrictVersion(sys.version[:3]) >= version.StrictVersion("2.7"):
- self.pool = multiprocessing.Pool(conf.max_threads, worker_process, (), 1)
+ self.pool = multiprocessing.Pool(conf.max_threads, worker_process, (), conf.max_tasks)
else:
self.pool = multiprocessing.Pool(conf.max_threads, worker_process, ())
@@ -238,26 +271,42 @@
s.listen(5)
- self.timer = Timer(180, self.pickup_spool_messages)
+ self.timer = Timer(180, self.pickup_spool_messages, args=[], kwargs={'sync': True})
self.timer.start()
# start background process to run periodic jobs in active modules
- self.heartbeat = multiprocessing.Process(target=modules_heartbeat, args=[self.modules])
- self.heartbeat.daemon = True
- self.heartbeat.start()
+ try:
+ self.heartbeat = multiprocessing.Process(target=modules_heartbeat, name="Wallace_Heartbeat", args=[self.modules])
+ self.heartbeat.finished = multiprocessing.Event()
+ self.heartbeat.daemon = True
+ self.heartbeat.start()
+ except Exception, errmsg:
+ log.error(_("Failed to start heartbeat daemon: %s %s") % (Exception, errmsg))
+ else:
+ log.debug(_("Wallace heartbeat is %s") % ('not alive','alive')[self.heartbeat.is_alive()], level=8)
try:
while 1:
while self.current_connections >= self.max_connections:
+ log.debug(_("Reached limit of max connections of: %s. Sleeping for 0.5s") % self.max_connections, level=6)
time.sleep(0.5)
pair = s.accept()
- log.info(_("Accepted connection"))
+ log.debug(_("Accepted connection %r with address %r") % (pair if pair is not None else (None, None)), level=8)
if not pair == None:
self.current_connections += 1
connection, address = pair
- channel = SMTPChannel(self, connection, address)
+
+ _smtpd = smtpd
+ # Set DEBUGSTREAM of smtpd to log to pykolab logger
+ if conf.debuglevel > 8:
+ _smtpd.DEBUGSTREAM = pykolab.logger.StderrToLogger(log)
+
+ log.debug(_("Creating SMTPChannel for accepted message"), level=8)
+ channel = _smtpd.SMTPChannel(self, connection, address)
asyncore.loop()
+ else:
+ log.error(_("Socket accepted, but (conn, address) tuple is None"))
except Exception, errmsg:
traceback.print_exc()
@@ -265,14 +314,17 @@
s.close()
# shut down hearbeat process
+ log.warning(_("About to terminate heartbeat process"))
self.heartbeat.terminate()
+ self.timer.cancel()
+ self.timer.join()
def data_header(self, mailfrom, rcpttos):
COMMASPACE = ', '
return "X-Kolab-From: " + mailfrom + "\r\n" + \
"X-Kolab-To: " + COMMASPACE.join(rcpttos) + "\r\n"
- def pickup_spool_messages(self, sync=False):
+ def pickup_spool_messages(self, sync=False, *args, **kwargs):
# Mind you to include the trailing slash
pickup_path = '/var/spool/pykolab/wallace/'
@@ -378,24 +430,53 @@
os.write(fp, data)
os.close(fp)
- self.pool.apply_async(pickup_message, (filename, (self.modules)))
+ log.debug(_("Started processing accepted message %s") % filename, level=8)
- self.current_connections -= 1
-
- return "250 OK Message %s queued" % (filename)
+ try:
+ log.debug(_("There are %s currently active pool child processes") % len(multiprocessing.active_children()), level=8)
+ result = self.pool.apply_async(pickup_message, (filename, (self.modules)))
+ log.debug(_("Wallace cb_action_accept: %s") % result.get(timeout=20), level=8)
+ except TimeoutError, errmsg:
+ log.error(_("Multiprocessing.TimeoutError %s, leaving message for Wallace Timer to pickup") % errmsg)
+ finally:
+ self.current_connections -= 1
+ return "250 OK Message %s queued" % (filename)
def reload_config(self, *args, **kw):
- pass
+ log.error(_("Received signal %s ") % args[0])
+ return
def remove_pid(self, *args, **kw):
- if os.access(conf.pidfile, os.R_OK):
- os.remove(conf.pidfile)
+ try:
+ if os.getpid() == self.parent_pid:
+ log.debug("Stopping process %s" % multiprocessing.current_process().name)
+
+ log.debug(_("Terminating processes pool"), level=8)
+ self.pool.close()
+
+ if hasattr(self, 'timer'):
+ if not self.timer.finished.is_set():
+ log.debug("Canceling Wallace Timer", level=8)
+ self.timer.finished.set()
+ self.timer.cancel()
+
+ log.debug(_("Terminating heartbeat process"), level=8)
+ self.heartbeat.finished.set()
+ self.heartbeat.terminate()
- if self.pool is not None:
- self.pool.close()
- self.pool.join()
+ self.pool.join()
+ self.timer.join(5)
+ self.heartbeat.join(5)
- raise SystemExit
+ if os.access(conf.pidfile, os.R_OK):
+ log.warning(_("Removing PID file %s") % conf.pidfile)
+ os.remove(conf.pidfile)
+
+ log.warning("Exiting!")
+ sys.exit()
+
+ except Exception, errmsg:
+ log.debug("Exception while trying to stop %s: %s" % (multiprocessing.current_process().name, errmsg), level=8)
def run(self):
"""
@@ -508,10 +589,6 @@
os.close(1)
os.close(2)
- os.open(os.devnull, os.O_RDONLY)
- os.open(os.devnull, os.O_WRONLY)
- os.open(os.devnull, os.O_WRONLY)
-
log.remove_stdout_handler()
self.set_signal_handlers()
self.write_pid()

File Metadata

Mime Type
text/plain
Expires
Fri, Apr 3, 10:19 AM (1 h, 59 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
18823635
Default Alt Text
D761.1775211557.diff (11 KB)

Event Timeline