Changeset View
Changeset View
Standalone View
Standalone View
wallace/__init__.py
Show All 18 Lines | |||||
import asyncore | import asyncore | ||||
from distutils import version | from distutils import version | ||||
import grp | import grp | ||||
import multiprocessing | import multiprocessing | ||||
import os | import os | ||||
import pwd | import pwd | ||||
import traceback | import traceback | ||||
from smtpd import SMTPChannel | import smtpd | ||||
import socket | import socket | ||||
import struct | import struct | ||||
import sys | import sys | ||||
import tempfile | import tempfile | ||||
from threading import _Timer | from threading import _Timer | ||||
import time | import time | ||||
import traceback | import traceback | ||||
import pykolab | import pykolab | ||||
from pykolab import utils | from pykolab import utils | ||||
from pykolab.translate import _ | from pykolab.translate import _ | ||||
log = pykolab.getLogger('pykolab.wallace') | log = pykolab.getLogger('pykolab.wallace') | ||||
sys.stderr = pykolab.logger.StderrToLogger(log) | |||||
conf = pykolab.getConf() | conf = pykolab.getConf() | ||||
from modules import cb_action_ACCEPT | from modules import cb_action_ACCEPT | ||||
def pickup_message(filepath, *args, **kw): | def pickup_message(filepath, *args, **kw): | ||||
wallace_modules = args[0] | wallace_modules = args[0] | ||||
if kw.has_key('module'): | if kw.has_key('module'): | ||||
Show All 26 Lines | for module in wallace_modules: | ||||
continue_with_accept = False | continue_with_accept = False | ||||
# The message very likely has been consumed by the module that returned False | # The message very likely has been consumed by the module that returned False | ||||
if not os.path.isfile(filepath): | if not os.path.isfile(filepath): | ||||
break | break | ||||
if continue_with_accept: | if continue_with_accept: | ||||
cb_action_ACCEPT('wallace', filepath) | cb_action_ACCEPT('wallace', filepath) | ||||
return continue_with_accept | |||||
def modules_heartbeat(wallace_modules): | def modules_heartbeat(wallace_modules): | ||||
lastrun = 0 | lastrun = 0 | ||||
while True: | while not multiprocessing.current_process().finished.is_set(): | ||||
try: | try: | ||||
log.debug(_("Running %s (pid: %s)") % | |||||
(multiprocessing.current_process().name, multiprocessing.current_process().pid), | |||||
level=8 | |||||
) | |||||
for module in wallace_modules: | for module in wallace_modules: | ||||
try: | try: | ||||
modules.heartbeat(module, lastrun) | modules.heartbeat(module, lastrun) | ||||
except: | except: | ||||
log.error(_("Module %s.heartbeat() failed with error: %s" % (module, traceback.format_exc()))) | log.error(_("Module %s.heartbeat() failed with error: %s" % (module, traceback.format_exc()))) | ||||
lastrun = int(time.time()) | lastrun = int(time.time()) | ||||
time.sleep(60) | multiprocessing.current_process().finished.wait(60) | ||||
except (SystemExit, KeyboardInterrupt), e: | |||||
log.info("Terminating heartbeat process") | except (SystemExit, KeyboardInterrupt), errmsg: | ||||
log.warning(_("Exiting %s, %s") %(multiprocessing.current_process().name, errmsg), level=8) | |||||
break | break | ||||
except Exception, errmsg: | |||||
log.error(_("Wallace heartbeat failed: %s") % errmsg) | |||||
def worker_process(*args, **kw): | def worker_process(*args, **kw): | ||||
log.debug(_("Worker process %s initializing") % (multiprocessing.current_process().name), level=1) | log.debug(_("Worker process %s initializing") % (multiprocessing.current_process().name), level=1) | ||||
class Timer(_Timer): | class Timer(_Timer): | ||||
def run(self): | def run(self): | ||||
while True: | while True: | ||||
while not self.finished.is_set(): | while not self.finished.is_set(): | ||||
self.finished.wait(self.interval) | self.finished.wait(self.interval) | ||||
log.debug(_("Timer looping function '%s' every %ss") % (self.function.__name__, self.interval), level=8) | |||||
self.function(*self.args, **self.kwargs) | self.function(*self.args, **self.kwargs) | ||||
self.finished.set() | self.finished.set() | ||||
log.debug(_("Timer loop %s") % ('still active', 'finished')[self.finished.is_set()], level=8) | |||||
break | |||||
class WallaceDaemon(object): | class WallaceDaemon(object): | ||||
def __init__(self): | def __init__(self): | ||||
self.current_connections = 0 | self.current_connections = 0 | ||||
self.max_connections = 24 | self.max_connections = 24 | ||||
self.parent_pid = None | |||||
self.pool = None | self.pool = None | ||||
daemon_group = conf.add_cli_parser_option_group(_("Daemon Options")) | daemon_group = conf.add_cli_parser_option_group(_("Daemon Options")) | ||||
daemon_group.add_option( | daemon_group.add_option( | ||||
"--fork", | "--fork", | ||||
dest = "fork_mode", | dest = "fork_mode", | ||||
action = "store_true", | action = "store_true", | ||||
Show All 15 Lines | def __init__(self): | ||||
dest = "process_groupname", | dest = "process_groupname", | ||||
action = "store", | action = "store", | ||||
default = "kolab", | default = "kolab", | ||||
help = _("Run as group GROUPNAME"), | help = _("Run as group GROUPNAME"), | ||||
metavar = "GROUPNAME" | metavar = "GROUPNAME" | ||||
) | ) | ||||
daemon_group.add_option( | daemon_group.add_option( | ||||
"--threads", | "--max-tasks", | ||||
dest = "max_threads", | dest = "max_tasks", | ||||
action = "store", | action = "store", | ||||
default = 4, | default = 10, | ||||
type = int, | type = int, | ||||
help = _("Number of threads to use.") | help = _("Number of tasks per process.") | ||||
) | ) | ||||
daemon_group.add_option( | daemon_group.add_option( | ||||
"-p", "--pid-file", | "-p", "--pid-file", | ||||
dest = "pidfile", | dest = "pidfile", | ||||
action = "store", | action = "store", | ||||
default = "/var/run/wallaced/wallaced.pid", | default = "/var/run/wallaced/wallaced.pid", | ||||
help = _("Path to the PID file to use.") | help = _("Path to the PID file to use.") | ||||
) | ) | ||||
daemon_group.add_option( | daemon_group.add_option( | ||||
"--port", | "--port", | ||||
dest = "wallace_port", | dest = "wallace_port", | ||||
action = "store", | action = "store", | ||||
default = 10026, | default = 10026, | ||||
type = int, | type = int, | ||||
help = _("Port that Wallace is supposed to use.") | help = _("Port that Wallace is supposed to use.") | ||||
) | ) | ||||
daemon_group.add_option( | daemon_group.add_option( | ||||
"--threads", | |||||
dest = "max_threads", | |||||
action = "store", | |||||
default = 4, | |||||
type = int, | |||||
help = _("Number of threads to use.") | |||||
) | |||||
daemon_group.add_option( | |||||
"-u", | "-u", | ||||
"--user", | "--user", | ||||
dest = "process_username", | dest = "process_username", | ||||
action = "store", | action = "store", | ||||
default = "kolab", | default = "kolab", | ||||
help = _("Run as user USERNAME"), | help = _("Run as user USERNAME"), | ||||
metavar = "USERNAME" | metavar = "USERNAME" | ||||
) | ) | ||||
conf.finalize_conf() | conf.finalize_conf() | ||||
utils.ensure_directory( | utils.ensure_directory( | ||||
os.path.dirname(conf.pidfile), | os.path.dirname(conf.pidfile), | ||||
conf.process_username, | conf.process_username, | ||||
conf.process_groupname | conf.process_groupname | ||||
) | ) | ||||
# Enable Python Multiprocess logging | |||||
if conf.debuglevel > 8: | |||||
mp_logger = multiprocessing.get_logger() | |||||
mp_logger.setLevel(multiprocessing.SUBDEBUG) | |||||
mp_logger.warning('Python Multiprocess logging started') | |||||
import modules | import modules | ||||
modules.__init__() | modules.__init__() | ||||
self.modules = conf.get_list('wallace', 'modules') | self.modules = conf.get_list('wallace', 'modules') | ||||
if self.modules == None: | if self.modules == None: | ||||
self.modules = [] | self.modules = [] | ||||
def do_wallace(self): | def do_wallace(self): | ||||
self.parent_pid = os.getpid() | |||||
if version.StrictVersion(sys.version[:3]) >= version.StrictVersion("2.7"): | if version.StrictVersion(sys.version[:3]) >= version.StrictVersion("2.7"): | ||||
self.pool = multiprocessing.Pool(conf.max_threads, worker_process, (), 1) | self.pool = multiprocessing.Pool(conf.max_threads, worker_process, (), conf.max_tasks) | ||||
else: | else: | ||||
self.pool = multiprocessing.Pool(conf.max_threads, worker_process, ()) | self.pool = multiprocessing.Pool(conf.max_threads, worker_process, ()) | ||||
self.pickup_spool_messages(sync=True) | self.pickup_spool_messages(sync=True) | ||||
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | ||||
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | ||||
Show All 25 Lines | def do_wallace(self): | ||||
time.sleep(1) | time.sleep(1) | ||||
s.close() | s.close() | ||||
time.sleep(1) | time.sleep(1) | ||||
s.listen(5) | s.listen(5) | ||||
self.timer = Timer(180, self.pickup_spool_messages) | self.timer = Timer(180, self.pickup_spool_messages, args=[], kwargs={'sync': True}) | ||||
self.timer.start() | self.timer.start() | ||||
# start background process to run periodic jobs in active modules | # start background process to run periodic jobs in active modules | ||||
self.heartbeat = multiprocessing.Process(target=modules_heartbeat, args=[self.modules]) | try: | ||||
self.heartbeat = multiprocessing.Process(target=modules_heartbeat, name="Wallace_Heartbeat", args=[self.modules]) | |||||
self.heartbeat.finished = multiprocessing.Event() | |||||
self.heartbeat.daemon = True | self.heartbeat.daemon = True | ||||
self.heartbeat.start() | self.heartbeat.start() | ||||
except Exception, errmsg: | |||||
log.error(_("Failed to start heartbeat daemon: %s %s") % (Exception, errmsg)) | |||||
else: | |||||
log.debug(_("Wallace heartbeat is %s") % ('not alive','alive')[self.heartbeat.is_alive()], level=8) | |||||
try: | try: | ||||
while 1: | while 1: | ||||
while self.current_connections >= self.max_connections: | while self.current_connections >= self.max_connections: | ||||
log.debug(_("Reached limit of max connections of: %s. Sleeping for 0.5s") % self.max_connections, level=6) | |||||
time.sleep(0.5) | time.sleep(0.5) | ||||
pair = s.accept() | pair = s.accept() | ||||
log.info(_("Accepted connection")) | log.debug(_("Accepted connection %r with address %r") % (pair if pair is not None else (None, None)), level=8) | ||||
if not pair == None: | if not pair == None: | ||||
self.current_connections += 1 | self.current_connections += 1 | ||||
connection, address = pair | connection, address = pair | ||||
channel = SMTPChannel(self, connection, address) | |||||
_smtpd = smtpd | |||||
# Set DEBUGSTREAM of smtpd to log to pykolab logger | |||||
if conf.debuglevel > 8: | |||||
_smtpd.DEBUGSTREAM = pykolab.logger.StderrToLogger(log) | |||||
log.debug(_("Creating SMTPChannel for accepted message"), level=8) | |||||
channel = _smtpd.SMTPChannel(self, connection, address) | |||||
asyncore.loop() | asyncore.loop() | ||||
else: | |||||
log.error(_("Socket accepted, but (conn, address) tuple is None")) | |||||
except Exception, errmsg: | except Exception, errmsg: | ||||
traceback.print_exc() | traceback.print_exc() | ||||
s.shutdown(1) | s.shutdown(1) | ||||
s.close() | s.close() | ||||
# shut down hearbeat process | # shut down hearbeat process | ||||
log.warning(_("About to terminate heartbeat process")) | |||||
self.heartbeat.terminate() | self.heartbeat.terminate() | ||||
self.timer.cancel() | |||||
self.timer.join() | |||||
def data_header(self, mailfrom, rcpttos): | def data_header(self, mailfrom, rcpttos): | ||||
COMMASPACE = ', ' | COMMASPACE = ', ' | ||||
return "X-Kolab-From: " + mailfrom + "\r\n" + \ | return "X-Kolab-From: " + mailfrom + "\r\n" + \ | ||||
"X-Kolab-To: " + COMMASPACE.join(rcpttos) + "\r\n" | "X-Kolab-To: " + COMMASPACE.join(rcpttos) + "\r\n" | ||||
def pickup_spool_messages(self, sync=False): | def pickup_spool_messages(self, sync=False, *args, **kwargs): | ||||
# Mind you to include the trailing slash | # Mind you to include the trailing slash | ||||
pickup_path = '/var/spool/pykolab/wallace/' | pickup_path = '/var/spool/pykolab/wallace/' | ||||
messages = [] | messages = [] | ||||
for root, directory, files in os.walk(pickup_path): | for root, directory, files in os.walk(pickup_path): | ||||
for filename in files: | for filename in files: | ||||
messages.append((root, filename)) | messages.append((root, filename)) | ||||
▲ Show 20 Lines • Show All 89 Lines • ▼ Show 20 Lines | def process_message(self, peer, mailfrom, rcpttos, data): | ||||
# @TODO: and add line separator (\n or \r\n?) | # @TODO: and add line separator (\n or \r\n?) | ||||
# we should make sure there's only one line separator between | # we should make sure there's only one line separator between | ||||
# kolab headers and the original message (data) | # kolab headers and the original message (data) | ||||
os.write(fp, header); | os.write(fp, header); | ||||
os.write(fp, data) | os.write(fp, data) | ||||
os.close(fp) | os.close(fp) | ||||
self.pool.apply_async(pickup_message, (filename, (self.modules))) | log.debug(_("Started processing accepted message %s") % filename, level=8) | ||||
try: | |||||
log.debug(_("There are %s currently active pool child processes") % len(multiprocessing.active_children()), level=8) | |||||
result = self.pool.apply_async(pickup_message, (filename, (self.modules))) | |||||
log.debug(_("Wallace cb_action_accept: %s") % result.get(timeout=20), level=8) | |||||
except TimeoutError, errmsg: | |||||
log.error(_("Multiprocessing.TimeoutError %s, leaving message for Wallace Timer to pickup") % errmsg) | |||||
finally: | |||||
self.current_connections -= 1 | self.current_connections -= 1 | ||||
return "250 OK Message %s queued" % (filename) | return "250 OK Message %s queued" % (filename) | ||||
def reload_config(self, *args, **kw): | def reload_config(self, *args, **kw): | ||||
pass | log.error(_("Received signal %s ") % args[0]) | ||||
return | |||||
def remove_pid(self, *args, **kw): | def remove_pid(self, *args, **kw): | ||||
if os.access(conf.pidfile, os.R_OK): | try: | ||||
os.remove(conf.pidfile) | if os.getpid() == self.parent_pid: | ||||
log.debug("Stopping process %s" % multiprocessing.current_process().name) | |||||
if self.pool is not None: | log.debug(_("Terminating processes pool"), level=8) | ||||
self.pool.close() | self.pool.close() | ||||
if hasattr(self, 'timer'): | |||||
if not self.timer.finished.is_set(): | |||||
log.debug("Canceling Wallace Timer", level=8) | |||||
self.timer.finished.set() | |||||
self.timer.cancel() | |||||
log.debug(_("Terminating heartbeat process"), level=8) | |||||
self.heartbeat.finished.set() | |||||
self.heartbeat.terminate() | |||||
self.pool.join() | self.pool.join() | ||||
self.timer.join(5) | |||||
self.heartbeat.join(5) | |||||
raise SystemExit | if os.access(conf.pidfile, os.R_OK): | ||||
log.warning(_("Removing PID file %s") % conf.pidfile) | |||||
os.remove(conf.pidfile) | |||||
log.warning("Exiting!") | |||||
sys.exit() | |||||
except Exception, errmsg: | |||||
log.debug("Exception while trying to stop %s: %s" % (multiprocessing.current_process().name, errmsg), level=8) | |||||
def run(self): | def run(self): | ||||
""" | """ | ||||
Run the Wallace daemon. | Run the Wallace daemon. | ||||
""" | """ | ||||
exitcode = 0 | exitcode = 0 | ||||
▲ Show 20 Lines • Show All 96 Lines • ▼ Show 20 Lines | def run(self): | ||||
sys.stderr.flush() | sys.stderr.flush() | ||||
sys.stdout.flush() | sys.stdout.flush() | ||||
os.close(0) | os.close(0) | ||||
os.close(1) | os.close(1) | ||||
os.close(2) | os.close(2) | ||||
os.open(os.devnull, os.O_RDONLY) | |||||
os.open(os.devnull, os.O_WRONLY) | |||||
os.open(os.devnull, os.O_WRONLY) | |||||
log.remove_stdout_handler() | log.remove_stdout_handler() | ||||
self.set_signal_handlers() | self.set_signal_handlers() | ||||
self.write_pid() | self.write_pid() | ||||
self.do_wallace() | self.do_wallace() | ||||
except SystemExit, e: | except SystemExit, e: | ||||
exitcode = e | exitcode = e | ||||
except KeyboardInterrupt: | except KeyboardInterrupt: | ||||
Show All 31 Lines |