Page Menu
Home
Phorge
Search
Configure Global Search
Log In
Files
F117759586
D761.1775211557.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Flag For Later
Award Token
Authored By
Unknown
Size
11 KB
Referenced Files
None
Subscribers
None
D761.1775211557.diff
View Options
diff --git a/.arclint b/.arclint
--- a/.arclint
+++ b/.arclint
@@ -19,6 +19,7 @@
"E131": "disabled",
"E201": "disabled",
"E202": "disabled",
+ "E221": "disabled",
"E225": "disabled",
"E231": "disabled",
"E251": "disabled",
diff --git a/pykolab/logger.py b/pykolab/logger.py
--- a/pykolab/logger.py
+++ b/pykolab/logger.py
@@ -35,16 +35,27 @@
self.logger = logger
self.log_level = log_level
self.linebuf = ''
+ self.skip_next = False
def write(self, buf):
- # ugly patch to make smtplib debug logging records appear on one line in log file
+ # ugly patch to make smtplib and smtpd debug logging records appear on one line in log file
# smtplib uses "print>>stderr, var, var" statements for debug logging. These
# statements are splited into separate lines on separating whitespace.
+
for line in buf.rstrip().splitlines():
+ if self.skip_next:
+ self.skip_next = False
+ continue
+
if buf != '\n':
- if line.startswith('send:') or line.startswith('reply:'):
+ linestarts = line.split(':')[0]
+ if linestarts in ['send', 'reply', 'Data', 'recips', 'Peer', 'sender']:
self.linebuf = line
- return
+ elif linestarts.startswith('===>'):
+ # Do not log lines starting with ====>
+ self.linebuf = ''
+ self.skip_next = True
+ continue
else:
self.logger.log(self.log_level, '%s %s', self.linebuf, line.rstrip()[:150])
self.linebuf = ''
diff --git a/wallace/__init__.py b/wallace/__init__.py
--- a/wallace/__init__.py
+++ b/wallace/__init__.py
@@ -24,7 +24,7 @@
import os
import pwd
import traceback
-from smtpd import SMTPChannel
+import smtpd
import socket
import struct
import sys
@@ -38,6 +38,7 @@
from pykolab.translate import _
log = pykolab.getLogger('pykolab.wallace')
+sys.stderr = pykolab.logger.StderrToLogger(log)
conf = pykolab.getConf()
from modules import cb_action_ACCEPT
@@ -80,11 +81,18 @@
if continue_with_accept:
cb_action_ACCEPT('wallace', filepath)
+ return continue_with_accept
+
def modules_heartbeat(wallace_modules):
lastrun = 0
- while True:
+ while not multiprocessing.current_process().finished.is_set():
try:
+ log.debug(_("Running %s (pid: %s)") %
+ (multiprocessing.current_process().name, multiprocessing.current_process().pid),
+ level=8
+ )
+
for module in wallace_modules:
try:
modules.heartbeat(module, lastrun)
@@ -92,11 +100,15 @@
log.error(_("Module %s.heartbeat() failed with error: %s" % (module, traceback.format_exc())))
lastrun = int(time.time())
- time.sleep(60)
- except (SystemExit, KeyboardInterrupt), e:
- log.info("Terminating heartbeat process")
+ multiprocessing.current_process().finished.wait(60)
+
+ except (SystemExit, KeyboardInterrupt), errmsg:
+ log.warning(_("Exiting %s, %s") %(multiprocessing.current_process().name, errmsg), level=8)
break
+ except Exception, errmsg:
+ log.error(_("Wallace heartbeat failed: %s") % errmsg)
+
def worker_process(*args, **kw):
log.debug(_("Worker process %s initializing") % (multiprocessing.current_process().name), level=1)
@@ -105,14 +117,18 @@
while True:
while not self.finished.is_set():
self.finished.wait(self.interval)
+ log.debug(_("Timer looping function '%s' every %ss") % (self.function.__name__, self.interval), level=8)
self.function(*self.args, **self.kwargs)
self.finished.set()
+ log.debug(_("Timer loop %s") % ('still active', 'finished')[self.finished.is_set()], level=8)
+ break
class WallaceDaemon(object):
def __init__(self):
self.current_connections = 0
self.max_connections = 24
+ self.parent_pid = None
self.pool = None
daemon_group = conf.add_cli_parser_option_group(_("Daemon Options"))
@@ -144,12 +160,12 @@
)
daemon_group.add_option(
- "--threads",
- dest = "max_threads",
+ "--max-tasks",
+ dest = "max_tasks",
action = "store",
- default = 4,
+ default = 10,
type = int,
- help = _("Number of threads to use.")
+ help = _("Number of tasks per process.")
)
daemon_group.add_option(
@@ -169,6 +185,15 @@
help = _("Port that Wallace is supposed to use.")
)
+ daemon_group.add_option(
+ "--threads",
+ dest = "max_threads",
+ action = "store",
+ default = 4,
+ type = int,
+ help = _("Number of threads to use.")
+ )
+
daemon_group.add_option(
"-u",
"--user",
@@ -187,6 +212,12 @@
conf.process_groupname
)
+ # Enable Python Multiprocess logging
+ if conf.debuglevel > 8:
+ mp_logger = multiprocessing.get_logger()
+ mp_logger.setLevel(multiprocessing.SUBDEBUG)
+ mp_logger.warning('Python Multiprocess logging started')
+
import modules
modules.__init__()
@@ -195,8 +226,10 @@
self.modules = []
def do_wallace(self):
+ self.parent_pid = os.getpid()
+
if version.StrictVersion(sys.version[:3]) >= version.StrictVersion("2.7"):
- self.pool = multiprocessing.Pool(conf.max_threads, worker_process, (), 1)
+ self.pool = multiprocessing.Pool(conf.max_threads, worker_process, (), conf.max_tasks)
else:
self.pool = multiprocessing.Pool(conf.max_threads, worker_process, ())
@@ -238,26 +271,42 @@
s.listen(5)
- self.timer = Timer(180, self.pickup_spool_messages)
+ self.timer = Timer(180, self.pickup_spool_messages, args=[], kwargs={'sync': True})
self.timer.start()
# start background process to run periodic jobs in active modules
- self.heartbeat = multiprocessing.Process(target=modules_heartbeat, args=[self.modules])
- self.heartbeat.daemon = True
- self.heartbeat.start()
+ try:
+ self.heartbeat = multiprocessing.Process(target=modules_heartbeat, name="Wallace_Heartbeat", args=[self.modules])
+ self.heartbeat.finished = multiprocessing.Event()
+ self.heartbeat.daemon = True
+ self.heartbeat.start()
+ except Exception, errmsg:
+ log.error(_("Failed to start heartbeat daemon: %s %s") % (Exception, errmsg))
+ else:
+ log.debug(_("Wallace heartbeat is %s") % ('not alive','alive')[self.heartbeat.is_alive()], level=8)
try:
while 1:
while self.current_connections >= self.max_connections:
+ log.debug(_("Reached limit of max connections of: %s. Sleeping for 0.5s") % self.max_connections, level=6)
time.sleep(0.5)
pair = s.accept()
- log.info(_("Accepted connection"))
+ log.debug(_("Accepted connection %r with address %r") % (pair if pair is not None else (None, None)), level=8)
if not pair == None:
self.current_connections += 1
connection, address = pair
- channel = SMTPChannel(self, connection, address)
+
+ _smtpd = smtpd
+ # Set DEBUGSTREAM of smtpd to log to pykolab logger
+ if conf.debuglevel > 8:
+ _smtpd.DEBUGSTREAM = pykolab.logger.StderrToLogger(log)
+
+ log.debug(_("Creating SMTPChannel for accepted message"), level=8)
+ channel = _smtpd.SMTPChannel(self, connection, address)
asyncore.loop()
+ else:
+ log.error(_("Socket accepted, but (conn, address) tuple is None"))
except Exception, errmsg:
traceback.print_exc()
@@ -265,14 +314,17 @@
s.close()
# shut down hearbeat process
+ log.warning(_("About to terminate heartbeat process"))
self.heartbeat.terminate()
+ self.timer.cancel()
+ self.timer.join()
def data_header(self, mailfrom, rcpttos):
COMMASPACE = ', '
return "X-Kolab-From: " + mailfrom + "\r\n" + \
"X-Kolab-To: " + COMMASPACE.join(rcpttos) + "\r\n"
- def pickup_spool_messages(self, sync=False):
+ def pickup_spool_messages(self, sync=False, *args, **kwargs):
# Mind you to include the trailing slash
pickup_path = '/var/spool/pykolab/wallace/'
@@ -378,24 +430,53 @@
os.write(fp, data)
os.close(fp)
- self.pool.apply_async(pickup_message, (filename, (self.modules)))
+ log.debug(_("Started processing accepted message %s") % filename, level=8)
- self.current_connections -= 1
-
- return "250 OK Message %s queued" % (filename)
+ try:
+ log.debug(_("There are %s currently active pool child processes") % len(multiprocessing.active_children()), level=8)
+ result = self.pool.apply_async(pickup_message, (filename, (self.modules)))
+ log.debug(_("Wallace cb_action_accept: %s") % result.get(timeout=20), level=8)
+ except TimeoutError, errmsg:
+ log.error(_("Multiprocessing.TimeoutError %s, leaving message for Wallace Timer to pickup") % errmsg)
+ finally:
+ self.current_connections -= 1
+ return "250 OK Message %s queued" % (filename)
def reload_config(self, *args, **kw):
- pass
+ log.error(_("Received signal %s ") % args[0])
+ return
def remove_pid(self, *args, **kw):
- if os.access(conf.pidfile, os.R_OK):
- os.remove(conf.pidfile)
+ try:
+ if os.getpid() == self.parent_pid:
+ log.debug("Stopping process %s" % multiprocessing.current_process().name)
+
+ log.debug(_("Terminating processes pool"), level=8)
+ self.pool.close()
+
+ if hasattr(self, 'timer'):
+ if not self.timer.finished.is_set():
+ log.debug("Canceling Wallace Timer", level=8)
+ self.timer.finished.set()
+ self.timer.cancel()
+
+ log.debug(_("Terminating heartbeat process"), level=8)
+ self.heartbeat.finished.set()
+ self.heartbeat.terminate()
- if self.pool is not None:
- self.pool.close()
- self.pool.join()
+ self.pool.join()
+ self.timer.join(5)
+ self.heartbeat.join(5)
- raise SystemExit
+ if os.access(conf.pidfile, os.R_OK):
+ log.warning(_("Removing PID file %s") % conf.pidfile)
+ os.remove(conf.pidfile)
+
+ log.warning("Exiting!")
+ sys.exit()
+
+ except Exception, errmsg:
+ log.debug("Exception while trying to stop %s: %s" % (multiprocessing.current_process().name, errmsg), level=8)
def run(self):
"""
@@ -508,10 +589,6 @@
os.close(1)
os.close(2)
- os.open(os.devnull, os.O_RDONLY)
- os.open(os.devnull, os.O_WRONLY)
- os.open(os.devnull, os.O_WRONLY)
-
log.remove_stdout_handler()
self.set_signal_handlers()
self.write_pid()
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Fri, Apr 3, 10:19 AM (1 h, 59 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
18823635
Default Alt Text
D761.1775211557.diff (11 KB)
Attached To
Mode
D761: Python smtpd library can now log debug information to pykolab log stream (debug level 9)
Attached
Detach File
Event Timeline