Changeset View
Changeset View
Standalone View
Standalone View
wallace/__init__.py
Show First 20 Lines • Show All 86 Lines • ▼ Show 20 Lines | while True: | ||||
for module in wallace_modules: | for module in wallace_modules: | ||||
try: | try: | ||||
modules.heartbeat(module, lastrun) | modules.heartbeat(module, lastrun) | ||||
except: | except: | ||||
log.error(_("Module %s.heartbeat() failed with error: %s" % (module, traceback.format_exc()))) | log.error(_("Module %s.heartbeat() failed with error: %s" % (module, traceback.format_exc()))) | ||||
lastrun = int(time.time()) | lastrun = int(time.time()) | ||||
time.sleep(60) | time.sleep(60) | ||||
except (SystemExit, KeyboardInterrupt), e: | except KeyboardInterrupt: | ||||
log.info("Terminating heartbeat process") | |||||
break | |||||
except SystemExit: | |||||
log.info("Terminating heartbeat process") | log.info("Terminating heartbeat process") | ||||
break | break | ||||
def worker_process(*args, **kw): | def worker_process(*args, **kw): | ||||
log.debug(_("Worker process %s initializing") % (multiprocessing.current_process().name), level=1) | log.debug(_("Worker process %s initializing") % (multiprocessing.current_process().name), level=1) | ||||
class WallaceDaemon(object): | class WallaceDaemon(object): | ||||
def __init__(self): | def __init__(self): | ||||
self.current_connections = 0 | self.current_connections = 0 | ||||
self.max_connections = 24 | self.max_connections = 24 | ||||
self.pool = None | self.pool = None | ||||
daemon_group = conf.add_cli_parser_option_group(_("Daemon Options")) | daemon_group = conf.add_cli_parser_option_group(_("Daemon Options")) | ||||
daemon_group.add_option( | daemon_group.add_option( | ||||
"--fork", | "--fork", | ||||
dest = "fork_mode", | dest = "fork_mode", | ||||
action = "store_true", | action = "store_true", | ||||
default = False, | default = False, | ||||
help = _("Fork to the background.") | help = _("Fork to the background.") | ||||
) | ) | ||||
daemon_group.add_option( | daemon_group.add_option( | ||||
"-b", "--bind", | "-b", "--bind", | ||||
dest = "wallace_bind_address", | dest = "wallace_bind_address", | ||||
action = "store", | action = "store", | ||||
default = "localhost", | default = "localhost", | ||||
help = _("Bind address for Wallace.") | help = _("Bind address for Wallace.") | ||||
) | ) | ||||
daemon_group.add_option( | daemon_group.add_option( | ||||
"-g", | "-g", | ||||
"--group", | "--group", | ||||
dest = "process_groupname", | dest = "process_groupname", | ||||
action = "store", | action = "store", | ||||
default = "kolab", | default = "kolab", | ||||
help = _("Run as group GROUPNAME"), | help = _("Run as group GROUPNAME"), | ||||
metavar = "GROUPNAME" | metavar = "GROUPNAME" | ||||
) | ) | ||||
daemon_group.add_option( | daemon_group.add_option( | ||||
"--threads", | "--threads", | ||||
dest = "max_threads", | dest = "max_threads", | ||||
action = "store", | action = "store", | ||||
default = 4, | default = 4, | ||||
type = int, | type = int, | ||||
help = _("Number of threads to use.") | help = _("Number of threads to use.") | ||||
) | ) | ||||
daemon_group.add_option( | daemon_group.add_option( | ||||
"-p", "--pid-file", | "-p", "--pid-file", | ||||
dest = "pidfile", | dest = "pidfile", | ||||
action = "store", | action = "store", | ||||
default = "/var/run/wallaced/wallaced.pid", | default = "/var/run/wallaced/wallaced.pid", | ||||
help = _("Path to the PID file to use.") | help = _("Path to the PID file to use.") | ||||
) | ) | ||||
daemon_group.add_option( | daemon_group.add_option( | ||||
"--port", | "--port", | ||||
dest = "wallace_port", | dest = "wallace_port", | ||||
action = "store", | action = "store", | ||||
default = 10026, | default = 10026, | ||||
type = int, | type = int, | ||||
help = _("Port that Wallace is supposed to use.") | help = _("Port that Wallace is supposed to use.") | ||||
) | ) | ||||
daemon_group.add_option( | daemon_group.add_option( | ||||
"-u", | "-u", | ||||
"--user", | "--user", | ||||
dest = "process_username", | dest = "process_username", | ||||
action = "store", | action = "store", | ||||
default = "kolab", | default = "kolab", | ||||
help = _("Run as user USERNAME"), | help = _("Run as user USERNAME"), | ||||
metavar = "USERNAME" | metavar = "USERNAME" | ||||
) | ) | ||||
conf.finalize_conf() | conf.finalize_conf() | ||||
utils.ensure_directory( | utils.ensure_directory( | ||||
os.path.dirname(conf.pidfile), | os.path.dirname(conf.pidfile), | ||||
conf.process_username, | conf.process_username, | ||||
conf.process_groupname | conf.process_groupname | ||||
) | ) | ||||
import modules | import modules | ||||
modules.__init__() | modules.__init__() | ||||
self.modules = conf.get_list('wallace', 'modules') | self.modules = conf.get_list('wallace', 'modules') | ||||
if self.modules == None: | if self.modules == None: | ||||
self.modules = [] | self.modules = [] | ||||
Show All 11 Lines | def do_wallace(self): | ||||
while not bound: | while not bound: | ||||
try: | try: | ||||
if shutdown: | if shutdown: | ||||
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | ||||
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | ||||
s.bind((conf.wallace_bind_address, conf.wallace_port)) | s.bind((conf.wallace_bind_address, conf.wallace_port)) | ||||
bound = True | bound = True | ||||
except Exception, e: | except Exception: | ||||
log.warning( | log.warning( | ||||
_("Could not bind to socket on port %d on bind " + \ | _("Could not bind to socket on port %d on bind " + \ | ||||
"address %s") % ( | "address %s") % ( | ||||
conf.wallace_port, | conf.wallace_port, | ||||
conf.wallace_bind_address | conf.wallace_bind_address | ||||
) | ) | ||||
) | ) | ||||
while not shutdown: | while not shutdown: | ||||
try: | try: | ||||
s.shutdown(socket.SHUT_RDWR) | s.shutdown(socket.SHUT_RDWR) | ||||
shutdown = True | shutdown = True | ||||
except Exception, e: | except Exception: | ||||
log.warning(_("Could not shut down socket")) | log.warning(_("Could not shut down socket")) | ||||
time.sleep(1) | time.sleep(1) | ||||
s.close() | s.close() | ||||
time.sleep(1) | time.sleep(1) | ||||
s.listen(5) | s.listen(5) | ||||
▲ Show 20 Lines • Show All 60 Lines • ▼ Show 20 Lines | def do_wallace(self): | ||||
pair = s.accept() | pair = s.accept() | ||||
log.info(_("Accepted connection")) | log.info(_("Accepted connection")) | ||||
if not pair == None: | if not pair == None: | ||||
self.current_connections += 1 | self.current_connections += 1 | ||||
connection, address = pair | connection, address = pair | ||||
channel = SMTPChannel(self, connection, address) | channel = SMTPChannel(self, connection, address) | ||||
asyncore.loop() | asyncore.loop() | ||||
except Exception, errmsg: | except Exception: | ||||
traceback.print_exc() | traceback.print_exc() | ||||
s.shutdown(1) | s.shutdown(1) | ||||
s.close() | s.close() | ||||
# shut down hearbeat process | # shut down hearbeat process | ||||
self.heartbeat.terminate() | self.heartbeat.terminate() | ||||
def data_header(self, mailfrom, rcpttos): | def data_header(self, mailfrom, rcptto): | ||||
COMMASPACE = ', ' | COMMASPACE = ', ' | ||||
return "X-Kolab-From: " + mailfrom + "\r\n" + \ | return "X-Kolab-From: " + mailfrom + "\r\n" + \ | ||||
"X-Kolab-To: " + COMMASPACE.join(rcpttos) + "\r\n" | "X-Kolab-To: " + rcptto + "\r\n" | ||||
def process_message(self, peer, mailfrom, rcpttos, data): | def process_message(self, peer, mailfrom, rcpttos, data): | ||||
""" | """ | ||||
We have retrieved the message. This should be as fast as possible, | We have retrieved the message. This should be as fast as possible, | ||||
and not ever block. | and not ever block. | ||||
""" | """ | ||||
header = self.data_header(mailfrom, rcpttos) | for rcptto in rcpttos: | ||||
header = self.data_header(mailfrom, rcptto) | |||||
(fp, filename) = tempfile.mkstemp(dir="/var/spool/pykolab/wallace/") | (fp, filename) = tempfile.mkstemp(dir="/var/spool/pykolab/wallace/") | ||||
# @TODO: and add line separator (\n or \r\n?) | # @TODO: and add line separator (\n or \r\n?) | ||||
# we should make sure there's only one line separator between | # we should make sure there's only one line separator between | ||||
# kolab headers and the original message (data) | # kolab headers and the original message (data) | ||||
os.write(fp, header); | os.write(fp, header) | ||||
os.write(fp, data) | os.write(fp, data) | ||||
os.close(fp) | os.close(fp) | ||||
self.pool.apply_async(pickup_message, (filename, (self.modules))) | self.pool.apply_async(pickup_message, (filename, (self.modules))) | ||||
self.current_connections -= 1 | self.current_connections -= 1 | ||||
return "250 OK Message %s queued" % (filename) | return "250 OK Message %s queued" % (filename) | ||||
def reload_config(self, *args, **kw): | def reload_config(self, *args, **kw): | ||||
pass | pass | ||||
Show All 13 Lines | def run(self): | ||||
""" | """ | ||||
exitcode = 0 | exitcode = 0 | ||||
try: | try: | ||||
try: | try: | ||||
(ruid, euid, suid) = os.getresuid() | (ruid, euid, suid) = os.getresuid() | ||||
(rgid, egid, sgid) = os.getresgid() | (rgid, egid, sgid) = os.getresgid() | ||||
except AttributeError, errmsg: | except AttributeError: | ||||
ruid = os.getuid() | ruid = os.getuid() | ||||
rgid = os.getgid() | rgid = os.getgid() | ||||
if ruid == 0: | if ruid == 0: | ||||
# Means we can setreuid() / setregid() / setgroups() | # Means we can setreuid() / setregid() / setgroups() | ||||
if rgid == 0: | if rgid == 0: | ||||
# Get group entry details | # Get group entry details | ||||
try: | try: | ||||
▲ Show 20 Lines • Show All 88 Lines • ▼ Show 20 Lines | def run(self): | ||||
os.close(1) | os.close(1) | ||||
os.close(2) | os.close(2) | ||||
log.remove_stdout_handler() | log.remove_stdout_handler() | ||||
self.set_signal_handlers() | self.set_signal_handlers() | ||||
self.write_pid() | self.write_pid() | ||||
self.do_wallace() | self.do_wallace() | ||||
except SystemExit, e: | except SystemExit: | ||||
exitcode = e | exitcode = e | ||||
except KeyboardInterrupt: | except KeyboardInterrupt: | ||||
exitcode = 1 | exitcode = 1 | ||||
log.info(_("Interrupted by user")) | log.info(_("Interrupted by user")) | ||||
except AttributeError, e: | except AttributeError: | ||||
exitcode = 1 | exitcode = 1 | ||||
traceback.print_exc() | traceback.print_exc() | ||||
print >> sys.stderr, _("Traceback occurred, please report a bug at https://issues.kolab.org") | print >> sys.stderr, _("Traceback occurred, please report a bug at https://issues.kolab.org") | ||||
except TypeError, e: | except TypeError: | ||||
exitcode = 1 | exitcode = 1 | ||||
traceback.print_exc() | traceback.print_exc() | ||||
log.error(_("Type Error: %s") % e) | log.error(_("Type Error: %s") % e) | ||||
except: | except: | ||||
exitcode = 2 | exitcode = 2 | ||||
traceback.print_exc() | traceback.print_exc() | ||||
print >> sys.stderr, _("Traceback occurred, please report a bug at https://issues.kolab.org") | print >> sys.stderr, _("Traceback occurred, please report a bug at https://issues.kolab.org") | ||||
Show All 15 Lines |